You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/04/30 18:59:08 UTC
svn commit: r1830622 [4/6] - in /uima/uima-ducc/trunk: issuesFixed/
issuesFixed/css/ issuesFixed/images/ issuesFixed/images/logos/ target/
target/javadoc-bundle-options/ uima-ducc-pullservice/
uima-ducc-pullservice/.settings/ uima-ducc-pullservice/src/...
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.protocol.builtin;
+
+import java.io.InvalidClassException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.ducc.ps.net.iface.IMetaTask;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
+import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction.Type;
+import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction;
+import org.apache.uima.ducc.ps.net.impl.TransactionId;
+import org.apache.uima.ducc.ps.service.IService;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.errors.ServiceException;
+import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.processor.IProcessResult;
+import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
+import org.apache.uima.ducc.ps.service.protocol.INoTaskAvailableStrategy;
+import org.apache.uima.ducc.ps.service.protocol.IServiceProtocolHandler;
+import org.apache.uima.ducc.ps.service.transport.IServiceTransport;
+import org.apache.uima.ducc.ps.service.transport.TransportException;
+import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+/**
+ *
+ * This protocol handler is a Runnable
+ *
+ */
+public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
+ Logger logger = UIMAFramework.getLogger(DefaultServiceProtocolHandler.class);
+ private volatile boolean initError = false;
+ private volatile boolean running = false;
+ private IServiceTransport transport;
+ private IServiceProcessor processor;
+ private INoTaskAvailableStrategy noTaskStrategy;
+ // each process thread will count down the latch after intialization
+ private CountDownLatch initLatch;
+ // this PH will count the stopLatch down when it is about to stop. The service
+ // is the owner of this latch and awaits termination blocking in start()
+ private CountDownLatch stopLatch;
+ // each process thread block on startLatch until application calls start()
+ private CountDownLatch startLatch;
+ // reference to a service so that stop() can be called
+ private IService service;
+ // forces process threads to initialize serially
+ private ReentrantLock initLock = new ReentrantLock();
+
+ private static AtomicInteger idGenerator = new AtomicInteger();
+
+
+ private DefaultServiceProtocolHandler(Builder builder) {
+ this.initLatch = builder.initLatch;
+ this.stopLatch = builder.stopLatch;
+ this.service = builder.service;
+ this.transport = builder.transport;
+ this.processor = builder.processor;
+ this.noTaskStrategy = builder.strategy;
+ }
+
+ private void waitForAllThreadsToInitialize() {
+ try {
+ initLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ private void initialize() throws ServiceInitializationException {
+
+ if (initError) {
+ return;
+ }
+ // this latch blocks all process threads after initialization
+ // until application calls start()
+ startLatch = new CountDownLatch(1);
+ try {
+ // use a lock to serialize initialization one thread at a time
+ initLock.lock();
+ processor.initialize();
+ } catch (Exception e) {
+ initError = true;
+ running = false;
+ throw new ServiceInitializationException(
+ "Thread:" + Thread.currentThread().getName() + " Failed initialization", e);
+ } finally {
+
+ initLatch.countDown();
+ initLock.unlock();
+ if (!initError) {
+ // wait on startLatch
+ waitForAllThreadsToInitialize();
+ }
+ }
+ }
+
+ private IMetaTaskTransaction send(IMetaTaskTransaction transaction) throws Exception {
+ TransactionId tid;
+ if (Type.Get.equals(transaction.getType())) {
+ int major = idGenerator.addAndGet(1);
+ int minor = 0;
+
+ tid = new TransactionId(major, minor);
+ } else {
+ tid = transaction.getTransactionId();
+ // increment minor
+ tid.next();
+ }
+ transaction.setTransactionId(tid);
+ Object o = null;
+ try {
+ String body = XStreamUtils.marshall(transaction);
+ String content = transport.dispatch(body);
+ if ( content == null ) {
+ throw new TransportException("Service stopping - rejecting request");
+ }
+ o = XStreamUtils.unmarshall(content);
+
+ } catch ( Exception e) {
+ if ( !running ) {
+ System.out.println("... Not Running - throwing TransporException");
+ throw new TransportException("Service stopping - rejecting request");
+ }
+ throw e;
+ }
+ if (o instanceof IMetaTaskTransaction) {
+ return (MetaTaskTransaction) o;
+ } else {
+ throw new InvalidClassException(
+ "Expected IMetaCasTransaction - Instead Received " + o.getClass().getName());
+ }
+ }
+
+ private IMetaTaskTransaction callEnd(IMetaTaskTransaction transaction) throws Exception {
+ transaction.setType(Type.End);
+ if ( logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ProtocolHandler calling END");
+ }
+ return send(transaction);
+
+ }
+
+ private IMetaTaskTransaction callAck(IMetaTaskTransaction transaction) throws Exception {
+ transaction.setType(Type.Ack);
+ if ( logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ProtocolHandler calling ACK");
+ }
+ return send(transaction);
+ }
+
+ private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) throws Exception {
+ transaction.setType(Type.Get);
+ if ( logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ProtocolHandler calling GET");
+ }
+ return send(transaction);
+ }
+ /**
+ * Block until service start() is called
+ *
+ * @throws ServiceInitializationException
+ */
+ private void awaitStart() throws ServiceInitializationException {
+ try {
+ startLatch.await();
+ } catch(InterruptedException e ) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInitializationException("Thread interrupted while awaiting start()");
+ }
+ }
+ public String call() throws ServiceInitializationException, ServiceException {
+ // we may fail in initialize() in which case the ServiceInitializationException
+ // is thrown
+ initialize();
+
+ // now wait for application to call start
+ awaitStart();
+
+ // all threads intialized, enter running state
+
+ IMetaTaskTransaction transaction = null;
+
+ if ( logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, ".............. Thread "+Thread.currentThread().getId() + " ready to process");
+ }
+
+ while (running) {
+
+ try {
+ // send GET Request
+ transaction = callGet(new MetaTaskTransaction());
+ // the code may have blocked in callGet for awhile, so check
+ // if service is still running
+ if ( !running ) {
+ break;
+ }
+ if (transaction.getMetaTask() == null || transaction.getMetaTask().getUserSpaceTask() == null ) {
+ // the client has no tasks to give.
+ noTaskStrategy.handleNoTaskSupplied();
+ continue;
+ }
+ Object task = transaction.getMetaTask().getUserSpaceTask();
+
+ // send ACK
+ transaction = callAck(transaction);
+ if (!running) {
+ break;
+ }
+ IProcessResult processResult = processor.process((String) task);
+
+ // assume success
+ Action action = Action.CONTINUE;
+ if (processResult.terminateProcess()) {
+ action = Action.TERMINATE;
+ String errorAsString = processResult.getError();
+ IMetaTask mc = transaction.getMetaTask();
+ mc.setUserSpaceException(errorAsString);
+ } else {
+ // success
+ // System.out.println("Performance Metrics:"+processResult.getResult());
+ transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
+ }
+ // send END Request
+ callEnd(transaction);
+ if (running && Action.TERMINATE.equals(action)) {
+ logger.log(Level.WARNING, "Processor Failure - Action=Terminate");
+ // Can't stop using the current thread. This thread
+ // came from a thread pool we want to stop. Need
+ // a new/independent thread to call stop()
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ delegateStop();
+ }
+ }).start();
+ running = false;
+ }
+
+
+
+ } catch( IllegalStateException e) {
+ break;
+ } catch( TransportException e) {
+ break;
+ }
+ catch (Exception e) {
+ logger.log(Level.WARNING,"",e);
+ }
+ }
+ stopLatch.countDown();
+ logger.log(Level.INFO,"ProtocolHandler terminated");
+ return String.valueOf(Thread.currentThread().getId());
+ }
+
+
+ private void delegateStop() {
+ service.stop();
+ }
+ @Override
+ public void stop() {
+ running = false;
+ if ( logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, this.getClass().getName()+" stop() called");
+ }
+ }
+ @Override
+ public void start() {
+ running = true;
+ // process threads are initialized and are awaiting latch countdown
+ startLatch.countDown();
+ }
+ @Override
+ public void setServiceProcessor(IServiceProcessor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public void setTransport(IServiceTransport transport) {
+ this.transport = transport;
+ }
+
+
+ public static class Builder {
+ private IServiceTransport transport;
+ private IServiceProcessor processor;
+ private INoTaskAvailableStrategy strategy;
+ // each thread will count down the latch
+ private CountDownLatch initLatch;
+ private CountDownLatch stopLatch;
+ private IService service;
+
+ public Builder withTransport(IServiceTransport transport) {
+ this.transport = transport;
+ return this;
+ }
+ public Builder withProcessor(IServiceProcessor processor) {
+ this.processor = processor;
+ return this;
+ }
+ public Builder withInitCompleteLatch(CountDownLatch initLatch) {
+ this.initLatch = initLatch;
+ return this;
+ }
+ public Builder withDoneLatch(CountDownLatch stopLatch) {
+ this.stopLatch = stopLatch;
+ return this;
+ }
+ public Builder withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
+ this.strategy = strategy;
+ return this;
+ }
+ public Builder withService(IService service) {
+ this.service = service;
+ return this;
+ }
+ public DefaultServiceProtocolHandler build() {
+ return new DefaultServiceProtocolHandler(this);
+ }
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.ps.service.protocol.builtin;
+
+import org.apache.uima.ducc.ps.service.protocol.INoTaskAvailableStrategy;
+
+public class NoWaitStrategy implements INoTaskAvailableStrategy {
+
+ @Override
+ public void handleNoTaskSupplied() {
+ // No Op
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/NoWaitStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.registry;
+
+import org.apache.uima.ducc.ps.service.transport.ITargetURI;
+
+public class DefaultRegistryClient implements IRegistryClient {
+ private String target;
+
+ public DefaultRegistryClient(ITargetURI targetUrl) {
+ this.target = targetUrl.asString();
+ }
+ @Override
+ public String lookUp(String currentTarget) throws RegistryNotAvailableException {
+ return target;
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/DefaultRegistryClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.registry;
+
+public interface IRegistryClient {
+ public String lookUp(String currentTarget) throws RegistryNotAvailableException;
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/IRegistryClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.registry;
+
+public class RegistryNotAvailableException extends RuntimeException{
+
+ private static final long serialVersionUID = 1L;
+ public RegistryNotAvailableException(String msg) {
+ super(msg);
+ }
+ public RegistryNotAvailableException(String msg, Exception e) {
+ super(msg, e);
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/registry/RegistryNotAvailableException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport;
+
+public class ConnectionLostException extends RuntimeException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public ConnectionLostException(String msg) {
+ super(msg);
+ }
+
+ public ConnectionLostException(String msg, Exception e) {
+ super(msg, e);
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ConnectionLostException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport;
+
+import org.apache.uima.ducc.ps.service.IServiceComponent;
+import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+
+public interface IServiceTransport extends IServiceComponent {
+ // called by Protocal Handler. Any errors will be handled
+ // by instance of IServiceErrorHandler
+ public String dispatch(String request) throws TransportException;
+ // initialize transport
+ public void initialize() throws ServiceInitializationException;
+ // stop transport
+ public void stop();
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport;
+
+public interface ITargetURI {
+ public String asString(); // stringified target, http://localhost:8080/SomeApp
+ public String getProtocol(); // http, tcp, ...
+ public String getNodename();
+ public String getPort();
+ public String getContext(); // for http, this would be a servlet context
+ public String getDescription();
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/ITargetURI.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport;
+
+public class TransportException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public TransportException(Exception e) {
+ super(e);
+ }
+ public TransportException(String msg) {
+ super(msg);
+ }
+ public TransportException( String msg,Exception e) {
+ super(msg, e);
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+
+public class TransportStats implements IWindowStats{
+
+ private AtomicLong errorCount = new AtomicLong();
+ private AtomicLong successCount = new AtomicLong();
+ private AtomicLong errorCountSinceLastSuccess = new AtomicLong();
+
+
+
+ public synchronized void incrementErrorCount() {
+ errorCount.incrementAndGet();
+ errorCountSinceLastSuccess.incrementAndGet();
+ }
+ public synchronized void incrementSuccessCount() {
+ successCount.incrementAndGet();
+ // reset
+ errorCountSinceLastSuccess.set(0);
+ }
+
+ @Override
+ public long getErrorCount() {
+ return errorCount.get();
+ }
+
+ @Override
+ public long getSuccessCount() {
+ return successCount.get();
+ }
+
+ @Override
+ public long getErrorCountSinceLastSuccess() {
+ return errorCountSinceLastSuccess.get();
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/TransportStats.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+import com.thoughtworks.xstream.security.AnyTypePermission;
+import com.thoughtworks.xstream.security.NoTypePermission;
+
+public class XStreamUtils {
+
+ private static void initXStreanSecurity(XStream xStream) {
+ XStream.setupDefaultSecurity(xStream);
+ xStream.addPermission(NoTypePermission.NONE);
+ xStream.addPermission(AnyTypePermission.ANY);
+ }
+ public static String marshall( Object targetToMarshall) throws Exception {
+ synchronized(XStreamUtils.class) {
+ XStream xStream = new XStream(new DomDriver());
+ initXStreanSecurity(xStream);
+ return xStream.toXML(targetToMarshall);
+ }
+ }
+ public static Object unmarshall( String targetToUnmarshall) throws Exception {
+ synchronized(XStreamUtils.class) {
+ XStream xStream = new XStream(new DomDriver());
+ initXStreanSecurity(xStream);
+ //System.out.println("Recv'd:"+targetToUnmarshall);
+ return xStream.fromXML(targetToUnmarshall);
+ }
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport.http;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.errors.ServiceException;
+import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.registry.IRegistryClient;
+import org.apache.uima.ducc.ps.service.transport.IServiceTransport;
+import org.apache.uima.ducc.ps.service.transport.ITargetURI;
+import org.apache.uima.ducc.ps.service.transport.TransportException;
+import org.apache.uima.ducc.ps.service.transport.TransportStats;
+import org.apache.uima.ducc.ps.service.transport.target.NoOpTargetURI;
+import org.apache.uima.ducc.ps.service.transport.target.TargetURIFactory;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.Logger;
+
+public class HttpServiceTransport implements IServiceTransport {
+ private Logger logger = UIMAFramework.getLogger(HttpServiceTransport.class);
+ private HttpClient httpClient = null;
+ private PoolingHttpClientConnectionManager cMgr = null;
+ private int clientMaxConnections = 1;
+ private int clientMaxConnectionsPerRoute = 60;
+ private int clientMaxConnectionsPerHostPort = 0;
+ private ReentrantLock lock = new ReentrantLock();
+ private ReentrantLock registryLookupLock = new ReentrantLock();
+ private long threadSleepTime=10000; // millis
+ private final String nodeIP;
+ private final String nodeName;
+ private final String pid;
+ private ITargetURI currentTargetUrl = new NoOpTargetURI();
+ private static final String NA="N/A";
+ private TransportStats stats = new TransportStats();
+ private IRegistryClient registryClient;
+ // holds reference to HttpPost object for every thread. Key=thread id
+ private Map<Long,HttpPost> httpPostMap =
+ new HashMap<>();
+ private volatile boolean stopping = false;
+ private volatile boolean running = false;
+ private volatile boolean log = true;
+ /*
+ public HttpServiceTransport(IRegistryClient registryClient, int scaleout) {
+ // create instance of HttpServiceTransport with RegistryClient. The assumption
+ // is that the implementation of the client has been fully configured with
+ // registry URI and target id. We just pass in a NoOpTarget instead of null.
+ // The initialize() will use registry to lookup the client TargetURI and will
+ // create correct instance of ITargetURI based on what registry returns
+// this(new NoOpTargetURI(), registryClient, scaleout);
+ this(new NoOpTargetURI(), registryClient, scaleout);
+
+ }
+ */
+// private HttpServiceTransport(ITargetURI targetUrl, IRegistryClient registryClient, int scaleout) {
+ public HttpServiceTransport(IRegistryClient registryClient, int scaleout) throws ServiceException {
+ //TargetURIFactory.newTarget(registryClient.lookUp(new NoOpTargetURI().asString()));
+ /*
+ if ( registryClient == null ) {
+ // the default client just returns the same targetUrl
+ // No lookups
+ this.registryClient = new DefaultRegistryClient(targetUrl);
+ } else {
+ this.registryClient = registryClient;
+ }
+ */
+ this.registryClient = registryClient;
+ clientMaxConnections = scaleout;
+
+ try {
+ nodeIP = InetAddress.getLocalHost().getHostAddress();
+ nodeName=InetAddress.getLocalHost().getCanonicalHostName();
+ pid = getProcessIP(NA);
+ } catch( UnknownHostException e) {
+ throw new RuntimeException(new TransportException("HttpServiceTransport.ctor - Unable to determine Host Name and IP",e));
+ }
+
+ }
+ private HttpPost getPostMethodForCurrentThread() {
+ HttpPost postMethod;
+ if ( !httpPostMap.containsKey(Thread.currentThread().getId())) {
+ // each thread needs its own PostMethod
+ postMethod =
+ new HttpPost(currentTargetUrl.asString());
+ httpPostMap.put(Thread.currentThread().getId(),postMethod);
+ } else {
+ postMethod = httpPostMap.get(Thread.currentThread().getId());
+ }
+ return postMethod;
+ }
+ private String getProcessIP(final String fallback) {
+ // the following code returns '<pid>@<hostname>'
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ int pos = name.indexOf('@');
+
+ if (pos < 1) {
+ // pid not found
+ return fallback;
+ }
+
+ try {
+ return Long.toString(Long.parseLong(name.substring(0, pos)));
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ return fallback;
+ }
+ private void lookupNewTarget() {
+ registryLookupLock.lock();
+ while( !stopping ) {
+ try {
+ String newTarget = registryClient.lookUp(currentTargetUrl.asString());
+ // logger.log(Level.INFO, "Registry lookup succesfull - current target URL:"+newTarget);
+ currentTargetUrl = TargetURIFactory.newTarget(newTarget);
+ return;
+ } catch( Exception e) {
+ synchronized (httpClient) {
+
+ try {
+ httpClient.wait(threadSleepTime);
+ } catch( InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+ if (registryLookupLock.isHeldByCurrentThread()) {
+ registryLookupLock.unlock();
+ }
+ }
+ public void initialize() throws ServiceInitializationException {
+
+ // use plugged in registry to lookup target to connect to.
+ // Sets global: currentTarget
+ lookupNewTarget();
+
+ cMgr = new PoolingHttpClientConnectionManager();
+
+ if (clientMaxConnections > 0) {
+ cMgr.setMaxTotal(clientMaxConnections);
+ }
+ // Set default max connections per route
+ if (clientMaxConnectionsPerRoute > 0) {
+ cMgr.setDefaultMaxPerRoute(clientMaxConnectionsPerRoute);
+ }
+ HttpHost httpHost = new HttpHost(currentTargetUrl.asString(), Integer.valueOf(currentTargetUrl.getPort()), currentTargetUrl.getContext());
+ if (clientMaxConnectionsPerHostPort > 0) {
+ cMgr.setMaxPerRoute(new HttpRoute(httpHost), clientMaxConnectionsPerHostPort);
+ }
+
+ int timeout = 30;
+ SocketConfig socketConfig = SocketConfig.custom().setSoTimeout(timeout*1000).build();
+ // RequestConfig requestConfig = RequestConfig.custom()
+ // .setConnectTimeout(timeout * 1000)
+ // .setConnectionRequestTimeout(timeout * 1000)
+ // .setSocketTimeout(0).build();
+ cMgr.setDefaultSocketConfig(socketConfig);
+
+// System.out.println("HttpTransport Max Connections:"+cMgr.getMaxTotal());
+// httpClient = HttpClients.custom().
+// setConnectionManager(cMgr).
+// setDefaultRequestConfig(requestConfig).build();
+
+ httpClient = HttpClients.custom().setConnectionManager(cMgr).build();
+ if ( logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO,"Cmgr SoTimeout="+cMgr.getDefaultSocketConfig().getSoTimeout());
+ }
+ running = true;
+ }
+ private void addCommonHeaders( HttpPost method ) {
+ synchronized( HttpServiceTransport.class ) {
+
+ method.setHeader("IP", nodeIP);
+ method.setHeader("Hostname", nodeName);
+ method.setHeader("ThreadID",
+ String.valueOf(Thread.currentThread().getId()));
+ method.setHeader("PID", pid);
+
+ }
+
+ }
+
+ private HttpEntity wrapRequest(String serializedRequest) {
+ return new StringEntity(serializedRequest, ContentType.APPLICATION_XML);
+ }
+
+ private boolean isRunning() {
+ return running;
+ }
+
+ private String retryUntilSuccessfull(String request, HttpPost postMethod) {
+ String response="";
+ // Only one thread attempts recovery. Other threads will block here
+ // until connection to the remote is restored.
+ lock.lock();
+
+ // retry until service is stopped
+ while (isRunning()) {
+ try {
+ //response = dispatch(request);
+ response = doPost(postMethod);
+ // success, so release the lock so that other waiting threads
+ // can retry command
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+
+ break;
+
+ } catch (TransportException | IOException | URISyntaxException exx) {
+ // Connection still not available so sleep awhile
+ synchronized (httpClient) {
+ try {
+ httpClient.wait(threadSleepTime);
+ } catch( InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ lookupNewTarget();
+
+ }
+ }
+ return response;
+
+ }
+ private String doPost(HttpPost postMethod) throws URISyntaxException, IOException, TransportException {
+ postMethod.setURI(new URI(currentTargetUrl.asString()));
+ HttpResponse response = httpClient.execute(postMethod);
+ if ( stopping ) {
+ throw new TransportException("Service stopping - rejecting request");
+ }
+ HttpEntity entity = response.getEntity();
+ String serializedResponse = EntityUtils.toString(entity);
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() != 200 && logger.isLoggable(Level.WARNING) ) {
+
+ logger.log(Level.WARNING,"execute", "Unable to Communicate with client - Error:"+statusLine);
+ logger.log(Level.WARNING, "Content causing error:"+serializedResponse);
+ throw new TransportException(
+ "Http Client Unable to Communicate with a remote client - Error:" + statusLine);
+ }
+ stats.incrementSuccessCount();
+ return serializedResponse;
+ }
+ @Override
+ public String dispatch(String serializedRequest) throws TransportException {
+// System.out.println(".... in dispatch()...stopping="+stopping);
+ if ( stopping ) {
+ throw new IllegalStateException("Service transport has been stopped, unable to dispatch request");
+ }
+ HttpEntity e = wrapRequest(serializedRequest);
+ // Each thread has its own HttpPost method. If current thread
+ // doesnt have one, it will be created and added to the local
+ // Map. Subsequent requests will fetch it from the map using
+ // current thread ID as a key.
+ HttpPost postMethod = getPostMethodForCurrentThread();
+ addCommonHeaders(postMethod);
+ postMethod.setEntity(e);
+ String serializedResponse = null;
+ try {
+ serializedResponse = doPost(postMethod);
+
+ } catch ( NoHttpResponseException ex ) {
+ if ( stopping ) {
+ System.out.println("Process Thread:"+Thread.currentThread().getId()+" NoHttpResponseException ");
+ //ex.printStackTrace();
+ throw new TransportException(ex);
+ } else {
+ serializedResponse = retryUntilSuccessfull(serializedRequest, postMethod);
+ }
+
+ // timeout so try again
+ ex.printStackTrace();
+ } catch (HttpHostConnectException | UnknownHostException ex ) {
+ if ( stopping ) {
+ System.out.println("Process Thread:"+Thread.currentThread().getId()+" HttpHostConnectException ");
+
+ throw new TransportException(ex);
+ }
+
+ stats.incrementErrorCount();
+ //ex.printStackTrace();
+ Action action = handleConnectionError(ex);
+ if ( Action.CONTINUE.equals(action)) {
+ try {
+ //postMethod.setURI(new URI(currentTargetUrl.asString()));
+ // Lost connection to the Task Allocation App
+ // Block until connection is restored
+ if ( log ) {
+ log = false;
+
+ logger.log(Level.INFO, ">>>>>>>>>> Unable to connect to target:"+currentTargetUrl.asString()+" - retrying until successfull - with between retries "+threadSleepTime+" ms");
+ }
+ serializedResponse = retryUntilSuccessfull(serializedRequest, postMethod);
+ log = true;
+ logger.log(Level.INFO, "Established connection to target:"+currentTargetUrl.asString());
+
+ } catch( Exception ee) {
+ log = true;
+ // Fail here - bad URI
+ }
+
+
+ } else if ( Action.TERMINATE.equals(action)) {
+ ex.printStackTrace();
+ }
+
+ } catch (SocketException ex) {
+ if ( stopping ) {
+ System.out.println("Process Thread:"+Thread.currentThread().getId()+" SocketException ");
+ //ex.printStackTrace();
+ throw new TransportException(ex);
+ }
+
+ } catch (TransportException ex) {
+
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new TransportException(ex);
+ }
+ finally {
+ postMethod.releaseConnection();
+ }
+
+ return serializedResponse;
+
+ }
+ private Action handleConnectionError(Exception e) {
+ if ( e instanceof HttpHostConnectException || e instanceof UnknownHostException ) {
+ synchronized (httpClient) {
+ try {
+ httpClient.wait(threadSleepTime);
+ } catch( InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return Action.CONTINUE;
+ } else {
+ return Action.TERMINATE;
+ }
+
+ }
+ public void stop() {
+ stopping = true;
+ running = false;
+ logger.log(Level.INFO,this.getClass().getName()+" stop() called");
+ if ( cMgr != null ) {
+ cMgr.shutdown();
+ }
+ logger.log(Level.INFO,this.getClass().getName()+" stopped connection mgr");
+ }
+ public static void main(String[] args) {
+
+ }
+
+
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/HttpTargetURI.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/HttpTargetURI.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/HttpTargetURI.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/HttpTargetURI.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport.target;
+
+import org.apache.uima.ducc.ps.service.transport.ITargetURI;
+
+public class HttpTargetURI implements ITargetURI {
+
+ String target;
+ String targetContext;
+ String targetPort;
+ String protocol;
+ String targetDescription="";
+
+ public HttpTargetURI(String target) {
+ this.target = target;
+
+ int pos = target.indexOf("//");
+ protocol = target.substring(0, pos-1);
+ int ipEndPos = target.indexOf(":", pos);
+ String jdIP = target.substring(pos+2,ipEndPos);
+ int portEndPos = target.indexOf("/", ipEndPos);
+ targetContext = target.substring(portEndPos+1);
+ targetPort = target.substring(ipEndPos+1, portEndPos);
+
+
+ }
+
+ @Override
+ public String getProtocol() {
+ return protocol;
+ }
+
+ @Override
+ public String getNodename() {
+ return null;
+ }
+
+ @Override
+ public String getPort() {
+ return targetPort;
+ }
+
+ @Override
+ public String getContext() {
+ return targetContext;
+ }
+
+ @Override
+ public String asString() {
+ return target;
+ }
+ @Override
+ public String getDescription() {
+ return targetDescription;
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/HttpTargetURI.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/NoOpTargetURI.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/NoOpTargetURI.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/NoOpTargetURI.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/NoOpTargetURI.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport.target;
+
+import org.apache.uima.ducc.ps.service.transport.ITargetURI;
+
+public class NoOpTargetURI implements ITargetURI {
+
+ @Override
+ public String asString() {
+ return "";
+ }
+
+ @Override
+ public String getProtocol() {
+ return "";
+ }
+
+ @Override
+ public String getNodename() {
+ return "";
+ }
+
+ @Override
+ public String getPort() {
+ return "";
+ }
+
+ @Override
+ public String getContext() {
+ return "";
+ }
+
+ @Override
+ public String getDescription() {
+ return "";
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/NoOpTargetURI.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/SocketTargetURI.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/SocketTargetURI.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/SocketTargetURI.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/SocketTargetURI.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport.target;
+
+import org.apache.uima.ducc.ps.service.transport.ITargetURI;
+
+public class SocketTargetURI implements ITargetURI {
+ String target;
+
+ public SocketTargetURI(String target) {
+ this.target = target;
+ }
+
+ @Override
+ public String getProtocol() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getNodename() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getPort() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getContext() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String asString() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getDescription() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/SocketTargetURI.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/TargetURIFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/TargetURIFactory.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/TargetURIFactory.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/TargetURIFactory.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.transport.target;
+
+import org.apache.uima.ducc.ps.service.errors.ServiceException;
+import org.apache.uima.ducc.ps.service.transport.ITargetURI;
+
+public class TargetURIFactory {
+ private TargetURIFactory() {}
+
+ public static ITargetURI newTarget(String targetAsString) throws ServiceException {
+ if ( targetAsString.toLowerCase().startsWith("http:")) {
+ return new HttpTargetURI(targetAsString);
+ } else if ( targetAsString.toLowerCase().startsWith("tcp:")) {
+ return new SocketTargetURI(targetAsString);
+ } else {
+ throw new ServiceException("Registry provider unsupported URL protocol - Expected either tcp or http - Instead got "+targetAsString);
+ }
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/target/TargetURIFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaSerializer.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaSerializer.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaSerializer.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaSerializer.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.uima.ducc.ps.service.utils;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.io.Writer;
+
+import javax.xml.parsers.FactoryConfigurationError;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.impl.XmiCasDeserializer;
+import org.apache.uima.cas.impl.XmiCasSerializer;
+import org.apache.uima.util.Level;
+import org.apache.uima.util.XMLSerializer;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.SAXNotRecognizedException;
+import org.xml.sax.SAXNotSupportedException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+
+public class UimaSerializer {
+ private static final String LOAD_EXTERNAL_DTD = "http://apache.org/xml/features/nonvalidating/load-external-dtd";
+ private static final String EXTERNAL_GENERAL_ENTITIES = "http://xml.org/sax/features/external-general-entities";
+ private static final String EXTERNAL_PARAMETER_ENTITIES = "http://xml.org/sax/features/external-parameter-entities";
+ /**
+ * Utility method for serializing a CAS to an XMI String
+ */
+ public String serializeCasToXmi(CAS aCAS)
+ throws Exception {
+ Writer writer = new StringWriter();
+ try {
+ XMLSerializer xmlSer = new XMLSerializer(writer, false);
+ XmiCasSerializer ser = new XmiCasSerializer(aCAS.getTypeSystem());
+ ser.serialize(aCAS, xmlSer.getContentHandler());
+ return writer.toString();
+ } catch (SAXException e) {
+ throw e;
+ } finally {
+ writer.close();
+ }
+ }
+ private void secureXmlReader(XMLReader xmlReader) {
+ try {
+ xmlReader.setFeature(EXTERNAL_GENERAL_ENTITIES, false);
+ } catch (SAXNotRecognizedException e) {
+ UIMAFramework.getLogger().log(Level.WARNING,
+ "XMLReader didn't recognize feature " + EXTERNAL_GENERAL_ENTITIES);
+ } catch (SAXNotSupportedException e) {
+ UIMAFramework.getLogger().log(Level.WARNING,
+ "XMLReader doesn't support feature " + EXTERNAL_GENERAL_ENTITIES);
+ }
+
+ try {
+ xmlReader.setFeature(EXTERNAL_PARAMETER_ENTITIES, false);
+ } catch (SAXNotRecognizedException e) {
+ UIMAFramework.getLogger().log(Level.WARNING,
+ "XMLReader didn't recognize feature " + EXTERNAL_PARAMETER_ENTITIES);
+ } catch (SAXNotSupportedException e) {
+ UIMAFramework.getLogger().log(Level.WARNING,
+ "XMLReader doesn't support feature " + EXTERNAL_PARAMETER_ENTITIES);
+ }
+
+ try {
+ xmlReader.setFeature(LOAD_EXTERNAL_DTD,false);
+ } catch (SAXNotRecognizedException e) {
+ UIMAFramework.getLogger().log(Level.WARNING,
+ "XMLReader didn't recognized feature " + LOAD_EXTERNAL_DTD);
+ } catch (SAXNotSupportedException e) {
+ UIMAFramework.getLogger().log(Level.WARNING,
+ "XMLReader doesn't support feature " + LOAD_EXTERNAL_DTD);
+ }
+
+ }
+ /**
+ * Utility method for deserializing a CAS from an XMI String
+ * Does both processing of requests arriving to this service
+ * and responses returning to this service, or to a client.
+ */
+ public void deserializeCasFromXmi(String anXmlStr, CAS aCAS)
+ throws FactoryConfigurationError, ParserConfigurationException, SAXException, IOException {
+
+ XMLReader xmlReader = XMLReaderFactory.createXMLReader();
+ secureXmlReader(xmlReader);
+ Reader reader = new StringReader(anXmlStr);
+ XmiCasDeserializer deser = new XmiCasDeserializer(aCAS.getTypeSystem());
+ ContentHandler handler = deser.getXmiCasHandler(aCAS);
+ xmlReader.setContentHandler(handler);
+ xmlReader.parse(new InputSource(reader));
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaSerializer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaUtils.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaUtils.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaUtils.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaUtils.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.ps.service.utils;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.uima.Constants;
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.UIMARuntimeException;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.analysis_engine.impl.AnalysisEngineDescription_impl;
+import org.apache.uima.analysis_engine.metadata.FixedFlow;
+import org.apache.uima.analysis_engine.metadata.FlowControllerDeclaration;
+import org.apache.uima.analysis_engine.metadata.impl.FixedFlow_impl;
+import org.apache.uima.analysis_engine.metadata.impl.FlowControllerDeclaration_impl;
+import org.apache.uima.ducc.ps.service.errors.InvalidOverrideParameterException;
+import org.apache.uima.resource.RelativePathResolver;
+import org.apache.uima.resource.ResourceConfigurationException;
+import org.apache.uima.resource.ResourceCreationSpecifier;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resource.impl.RelativePathResolver_impl;
+import org.apache.uima.resource.metadata.ConfigurationParameter;
+import org.apache.uima.resource.metadata.ConfigurationParameterDeclarations;
+import org.apache.uima.resource.metadata.ConfigurationParameterSettings;
+import org.apache.uima.resource.metadata.Import;
+import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
+import org.apache.uima.resource.metadata.impl.Import_impl;
+import org.apache.uima.util.InvalidXMLException;
+import org.apache.uima.util.XMLInputSource;
+
+public class UimaUtils {
+
+ public static final String FlowControllerKey="FixedFlowController";
+ public static RelativePathResolver resolver = new RelativePathResolver_impl();
+
+ public static URL getRelativePathWithProtocol(String aRelativePath)
+ throws MalformedURLException {
+ URL relativeUrl;
+ try {
+ relativeUrl = new URL(aRelativePath);
+ } catch (MalformedURLException e) {
+ relativeUrl = new URL("file", "", aRelativePath);
+ }
+ return relativeUrl;
+ }
+
+ public static ResourceSpecifier getResourceSpecifier(String resourceFile)
+ throws Exception {
+ return UIMAFramework.getXMLParser().parseResourceSpecifier(
+ getXMLInputSource(resourceFile));
+ }
+
+ /**
+ * Use the UIMA routine to load an xml descriptor from the filesystem or the current classpath
+ * @param resourceName - resource to load by location or name
+ * @return
+ * @throws InvalidXMLException
+ */
+ public static XMLInputSource getXMLInputSource(String resourceFile)
+ throws InvalidXMLException {
+ return getXMLInputSource(resourceFile, Thread.currentThread().getContextClassLoader());
+ }
+
+ /**
+ * Use the UIMA routine to load an xml descriptor from the filesystem or the specified classloader
+ * @param resourceFile - resource to load by location or name
+ * @param classloader - class loader to use
+ * @return - input source stream
+ * @throws InvalidXMLException
+ */
+ public static XMLInputSource getXMLInputSource(String resourceFile, ClassLoader classloader)
+ throws InvalidXMLException {
+
+ // If the resourceFile ends in .xml then we look in the filesystem.
+ // If not, then we turn it into a path by replacing . with / and
+ // appending .xml, and look in the classpath or datapath.
+ // TODO - should this be synchronized since resolver is statis?
+
+ String resource = null;
+ try {
+ resourceFile = Utils.resolvePlaceholderIfExists(resourceFile, System.getProperties());
+ XMLInputSource in = null;
+ if (resourceFile.endsWith(".xml")) {
+ resource = resourceFile;
+ in = new XMLInputSource(resourceFile);
+ } else {
+ resource = resourceFile.replace('.', '/') + ".xml";
+ resolver.setPathResolverClassLoader(classloader);
+ URL relativeURL = resolver.resolveRelativePath(new URL("file", "", resource));
+ if (relativeURL == null) {
+ throw new InvalidXMLException(InvalidXMLException.IMPORT_BY_NAME_TARGET_NOT_FOUND,
+ new String[] { resource, resourceFile });
+ }
+ in = new XMLInputSource(relativeURL);
+ }
+ return in;
+ } catch (IOException e) {
+ throw new InvalidXMLException(
+ InvalidXMLException.IMPORT_FAILED_COULD_NOT_READ_FROM_URL,
+ new String[] { resource, resourceFile });
+ }
+ }
+
+ public static ConfigurationParameter findConfigurationParameter(
+ ConfigurationParameterDeclarations configurationParameterDeclarations,
+ String name) {
+ ConfigurationParameter retVal = null;
+ for (ConfigurationParameter parameter : configurationParameterDeclarations
+ .getConfigurationParameters()) {
+ if (name.equals(parameter.getName())) {
+ retVal = parameter;
+ break;
+ }
+ }
+ return retVal;
+ }
+
+ public static Object getOverrideValueObject(
+ ConfigurationParameter configurationParameter, String value)
+ throws ResourceConfigurationException {
+ Object retVal = value;
+ try {
+ if (configurationParameter.getType().equals("Integer")) {
+ retVal = Integer.parseInt(value);
+ } else if (configurationParameter.getType().equals("Boolean")) {
+ retVal = Boolean.parseBoolean(value);
+ } else if (configurationParameter.getType().equals("Float")) {
+ retVal = Float.parseFloat(value);
+ }
+ } catch (Throwable t) {
+ throw new ResourceConfigurationException(t);
+ }
+ return retVal;
+ }
+ /**
+ * Creates UIMA aggregate AE description from provided parts. Takes as input
+ * vararg of AE descriptor paths for CM, AE, and CC. It creates an aggregate
+ * description with each component identified by its implementation class.
+ * The generated aggregate uses fixed flow.
+ *
+ * @param overrides
+ * - a list containing overrides. Each component override is a
+ * separate list containing strings with format <name>=<value>
+ *
+ * @param descriptorPaths
+ * - paths to uima component descriptors
+ *
+ * @return - instantiated aggregate {@link AnalysisEngineDescription}
+ *
+ * @throws Exception
+ */
+ public static AnalysisEngineDescription createAggregateDescription(
+ String flowController, boolean multipleDeploymentAllowed, List<List<String>> overrides, String... descriptorPaths)
+ throws Exception {
+
+ // create the descriptor and set configuration parameters
+ AnalysisEngineDescription desc = new AnalysisEngineDescription_impl();
+ resolver.setPathResolverClassLoader(desc.getClass().getClassLoader());
+ desc.setFrameworkImplementation(Constants.JAVA_FRAMEWORK_NAME);
+ desc.setPrimitive(false);
+ ResourceSpecifier[] specifiers = new ResourceSpecifier[descriptorPaths.length];
+
+ // Allow scale up
+ desc.getAnalysisEngineMetaData().getOperationalProperties()
+ .setMultipleDeploymentAllowed(multipleDeploymentAllowed);
+ // Stores component names derived from implementation class
+ List<String> flowNames = new ArrayList<String>();
+ int inx = 0;
+ // First produce ResourceSpecifiers from provided descriptors
+ for (String aeDescription : descriptorPaths) {
+ /*
+ aeDescription = Utils.resolvePlaceholderIfExists(aeDescription,
+ System.getProperties());
+ XMLInputSource in = null;
+ if (!aeDescription.endsWith(".xml")) {
+ aeDescription = aeDescription.replace('.', '/') + ".xml";
+ URL relativeURL = resolver.resolveRelativePath(getRelativePathWithProtocol(aeDescription));
+// URL relativeURL = resolveRelativePath(aeDescription);
+ in = new XMLInputSource(relativeURL);
+ } else {
+ in = new XMLInputSource(aeDescription);
+ }
+ // XMLInputSource in = new XMLInputSource(aeDescription);
+ ResourceSpecifier specifier = UIMAFramework.getXMLParser()
+ .parseResourceSpecifier(in);
+ specifiers[inx++] = specifier;
+ */
+ specifiers[inx++] = getResourceSpecifier(aeDescription);
+ // UimaClassFactory.produceResourceSpecifier(aeDescription);
+ }
+
+ for (String aeDescription : descriptorPaths) {
+ Import descriptorImport = new Import_impl();
+ // If user provides a descriptor with .xml at the end, assume he
+ // wants import by location
+ if (aeDescription.endsWith(".xml")) {
+ aeDescription = Utils.resolvePlaceholderIfExists(aeDescription,
+ System.getProperties());
+ if (!aeDescription.startsWith("file:")) {
+ aeDescription = "file:" + aeDescription;
+ }
+ descriptorImport.setLocation(aeDescription);
+ } else {
+ // uima import by name expects dot separated path as in
+ // a.b.descriptor and no .xml at the end
+ descriptorImport.setName(aeDescription);
+ }
+ String key = new String(aeDescription);
+ if (key.startsWith("file:")) {
+ key = key.substring(5); // skip "file:"
+ }
+ if (key.endsWith(".xml")) {
+ key = key.substring(0, key.indexOf(".xml")); // strip ".xml"
+ }
+ // preprocess the ae descriptor name to replace "/" and
+ // "\" with ".". We will use the ae
+ // descriptor name as AE key in the aggregate
+ if (key.indexOf("/") != -1) {
+ key = key.replaceAll("/", ".");
+ }
+ if (key.indexOf("\\") != -1) {
+ key = key.replaceAll("\\\\", ".");
+ }
+ key = key.substring(key.lastIndexOf(".") + 1);
+ desc.getDelegateAnalysisEngineSpecifiersWithImports().put(key,
+ descriptorImport);
+ flowNames.add(key);
+
+ }
+ if ( flowController != null ) {
+ FlowControllerDeclaration fcd = new FlowControllerDeclaration_impl();
+ desc.setFlowControllerDeclaration(fcd);
+ fcd.setImport(new Import_impl());
+ fcd.setKey(FlowControllerKey);
+ fcd.getImport().setName(flowController);
+ }
+
+ FixedFlow fixedFlow = new FixedFlow_impl();
+ fixedFlow.setFixedFlow(flowNames.toArray(new String[flowNames.size()]));
+ desc.getAnalysisEngineMetaData().setFlowConstraints(fixedFlow);
+ addOverrides(overrides, desc, specifiers, flowNames);
+
+ return desc;
+ }
+
+ private static void addOverrides(List<List<String>> overrides,
+ AnalysisEngineDescription desc, ResourceSpecifier[] specifiers,
+ List<String> flowNames) throws Exception {
+
+ ConfigurationParameterDeclarations aggregateDeclarations = desc
+ .getAnalysisEngineMetaData()
+ .getConfigurationParameterDeclarations();
+ ConfigurationParameterSettings aggregateSetttings = desc
+ .getAnalysisEngineMetaData()
+ .getConfigurationParameterSettings();
+ int indx = 0;
+ for (List<String> componentOverrides : overrides) {
+ if ( specifiers[indx] instanceof ResourceCreationSpecifier ) {
+ addComponentOverrides(flowNames.get(indx), componentOverrides,
+ (ResourceCreationSpecifier) specifiers[indx],
+ aggregateDeclarations, aggregateSetttings);
+ }
+ indx++;
+ }
+
+ }
+
+ /**
+ * Modifies aggregate descriptor by adding component specific overrides.
+ *
+ * @param key
+ * - component key
+ * @param componentOverrides
+ * - List of override params where element is expressed as String
+ * with format <name>=<value>
+ * @param specifier
+ * - component resource specifier
+ * @param aggregateDeclarations
+ * - aggregate ConfigurationParameterDeclarations
+ * @param aggregateSetttings
+ * - aggregate ConfigurationParameterSettings
+ */
+ private static void addComponentOverrides(String key,
+ List<String> componentOverrides,
+// AnalysisEngineDescription specifier,
+ ResourceCreationSpecifier specifier,
+ ConfigurationParameterDeclarations aggregateDeclarations,
+ ConfigurationParameterSettings aggregateSetttings) throws Exception {
+
+ if (componentOverrides == null || componentOverrides.isEmpty()) { // no
+ // overrides
+ return; // nothing to do
+ }
+ processOverrides(key, componentOverrides,
+ specifier, aggregateDeclarations,
+ // (ResourceCreationSpecifier) specifier, aggregateDeclarations,
+ aggregateSetttings);
+
+ }
+
+ private static void processOverrides(String key,
+ List<String> componentOverrides,
+ ResourceCreationSpecifier specifier,
+ ConfigurationParameterDeclarations aggregateDeclarations,
+ ConfigurationParameterSettings aggregateSetttings) throws Exception {
+ // Process overrides
+ for (String cmOverride : componentOverrides) {
+ System.out.println(".... Processing Override:"+cmOverride);
+ // each override is expressed as <name>=<value> pair, so split on
+ // the first '=' found ... in case the value contains an '='
+ String[] nvp = cmOverride.split("=", 2);
+ // Fetch component parameter declarations to get the primitive type
+ // of the parameter
+ ConfigurationParameterDeclarations componentParameterDeclarations = specifier
+ .getMetaData().getConfigurationParameterDeclarations();
+ // Iterate over component parameter declarations looking to find one
+ // with the same name
+ // as provided in the override. On match, add an override to the
+ // aggregate and preserve
+ // the type defined for the parameter in the component descriptor.
+ // If no match, throw
+ // an exception
+ boolean found = false;
+ for (ConfigurationParameter parameter : componentParameterDeclarations
+ .getConfigurationParameters()) {
+ if (nvp[0].equals(parameter.getName())) {
+ addParam(key, nvp, parameter, aggregateDeclarations);
+ addParamValue(nvp, parameter, aggregateSetttings);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new UIMARuntimeException(
+ new InvalidOverrideParameterException(
+ "Override Parameter:"
+ + nvp[0]
+ + " is not defined for the component with key: "
+ + key));
+ }
+ }
+
+ }
+
+ /**
+ * Adds parameter to aggregate ConfigurationParameterDeclarations.
+ *
+ * @param key
+ * - component key
+ * @param nvp
+ * - override name value pair
+ * @param parameter
+ * - matching ConfigurationParameter instance from component
+ * descriptor or null
+ * @param aggregateDeclarations
+ * - aggregate ConfigurationParameterDeclarations instance
+ */
+ private static void addParam(String key, String[] nvp,
+ ConfigurationParameter parameter,
+ ConfigurationParameterDeclarations aggregateDeclarations) {
+ ConfigurationParameter cfgParam = new ConfigurationParameter_impl();
+ cfgParam.setName(nvp[0]);
+ if (parameter == null) { // component descriptor doesnt contain a
+ // parameter provided in the override list.
+ // Default to String
+ cfgParam.setType("String"); // create String param as default
+ } else {
+ cfgParam.setType(parameter.getType());
+ }
+// if ( key.equals(FlowControllerKey)) {
+// cfgParam.addOverride(key + "/ActionAfterCasMultiplier");
+// } else {
+// cfgParam.addOverride(key + "/" + nvp[0]);
+// }
+ cfgParam.addOverride(key + "/" + nvp[0]);
+ aggregateDeclarations.addConfigurationParameter(cfgParam);
+
+ }
+
+ private static void addParamValue(String[] nvp,
+ ConfigurationParameter parameter,
+ ConfigurationParameterSettings aggregateSettings) {
+
+ Object value = nvp[1]; // default is String value
+ if (parameter != null) {
+ if (parameter.getType().equals("Integer")) {
+ value = Integer.parseInt(nvp[1]);
+ } else if (parameter.getType().equals("Boolean")) {
+ value = Boolean.parseBoolean(nvp[1]);
+ } else if (parameter.getType().equals("Float")) {
+ value = Float.parseFloat(nvp[1]);
+ }
+ aggregateSettings.setParameterValue(nvp[0], value);
+ } else {
+ aggregateSettings.setParameterValue(nvp[0], value);
+ }
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/UimaUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/Utils.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/Utils.java?rev=1830622&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/Utils.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/Utils.java Mon Apr 30 18:59:04 2018
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.ps.service.utils;
+
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+public class Utils {
+
+ /**
+ * Resolves placeholders in provided contents using java's Matcher. Finds
+ * all occurances of ${<placeholder>} and resolves each using System
+ * properties which holds <placeholder>=<value> pairs.
+ *
+ * @param contents
+ * - target text containing placeholder(s)
+ * @param props
+ * - Properties object holding key/value pairs
+ * @return - text with resolved placeholders
+ *
+ * @throws Exception
+ */
+ public static String resolvePlaceholders(String contents) {
+ return resolvePlaceholders(contents, System.getProperties());
+ }
+
+ /**
+ * Resolves placeholders in provided contents using java's Matcher. Finds
+ * all occurances of ${<placeholder>} and resolves each using provided
+ * Properties object which holds <placeholder>=<value> pairs. If the
+ * placeholder not found then tries the System properties.
+ *
+ * @param contents
+ * - target text containing placeholder(s)
+ * @param props
+ * - Properties object holding key/value pairs
+ * @return - text with resolved placeholders
+ *
+ * @throws Exception
+ */
+ public static String resolvePlaceholders(String contents, Properties props) {
+ // Placeholders syntax ${<placeholder>}
+ Pattern placeHolderPattern = Pattern.compile("\\$\\{(.*?)\\}");
+
+ java.util.regex.Matcher matcher = placeHolderPattern.matcher(contents);
+
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ // extract placeholder
+ final String key = matcher.group(1);
+ // Find value for extracted placeholder.
+ String placeholderValue = props.getProperty(key);
+ if (placeholderValue == null) {
+ placeholderValue = System.getProperty(key);
+ if (placeholderValue == null) {
+ throw new IllegalArgumentException(
+ "Missing value for placeholder: " + key);
+ }
+ }
+ matcher.appendReplacement(sb, placeholderValue);
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * Resolves placeholder using Spring Framework utility class
+ *
+ *
+ * @param value
+ * @param props
+ * @return
+ */
+ public static String resolvePlaceholderIfExists(String value,
+ Properties props) {
+ String retVal = value;
+ if (value != null && value.contains("${")) {
+ retVal = resolvePlaceholders(value, props);
+ }
+ return retVal;
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/utils/Utils.java
------------------------------------------------------------------------------
svn:eol-style = native