You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@olingo.apache.org by mi...@apache.org on 2015/07/08 20:57:21 UTC
olingo-odata4 git commit: [OLINGO-708] Minor code clean up in
TecAsyncSvc
Repository: olingo-odata4
Updated Branches:
refs/heads/OLINGO-708_AsyncSupportTec 31e3a8bd0 -> 8f6ceeae1
[OLINGO-708] Minor code clean up in TecAsyncSvc
Project: http://git-wip-us.apache.org/repos/asf/olingo-odata4/repo
Commit: http://git-wip-us.apache.org/repos/asf/olingo-odata4/commit/8f6ceeae
Tree: http://git-wip-us.apache.org/repos/asf/olingo-odata4/tree/8f6ceeae
Diff: http://git-wip-us.apache.org/repos/asf/olingo-odata4/diff/8f6ceeae
Branch: refs/heads/OLINGO-708_AsyncSupportTec
Commit: 8f6ceeae16733798cd27ae2384ed58780e2424d0
Parents: 31e3a8b
Author: mibo <mi...@apache.org>
Authored: Wed Jul 8 20:57:03 2015 +0200
Committer: mibo <mi...@apache.org>
Committed: Wed Jul 8 20:57:03 2015 +0200
----------------------------------------------------------------------
.../server/tecsvc/async/AsyncProcessor.java | 198 ++++++++++---------
.../tecsvc/async/TechnicalAsyncService.java | 127 ++++++++----
.../async/TechnicalStatusMonitorServlet.java | 31 +--
3 files changed, 207 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/8f6ceeae/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
index 99b81d6..e6d662e 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
@@ -18,6 +18,14 @@
*/
package org.apache.olingo.server.tecsvc.async;
+import org.apache.olingo.commons.api.http.HttpHeader;
+import org.apache.olingo.commons.api.http.HttpStatusCode;
+import org.apache.olingo.server.api.ODataApplicationException;
+import org.apache.olingo.server.api.ODataLibraryException;
+import org.apache.olingo.server.api.ODataRequest;
+import org.apache.olingo.server.api.ODataResponse;
+import org.apache.olingo.server.api.processor.Processor;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -35,33 +43,35 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
-import org.apache.olingo.commons.api.http.HttpHeader;
-import org.apache.olingo.commons.api.http.HttpStatusCode;
-import org.apache.olingo.server.api.ODataApplicationException;
-import org.apache.olingo.server.api.ODataLibraryException;
-import org.apache.olingo.server.api.ODataRequest;
-import org.apache.olingo.server.api.ODataResponse;
-import org.apache.olingo.server.api.processor.Processor;
-
+/**
+ * Async processor "wraps" an Processor (or subclass of) to provide asynchronous support functionality
+ * in combination with the TechnicalAsyncService.
+ *
+ * @param <T> "wrapped" Processor
+ */
public class AsyncProcessor<T extends Processor> {
- private final MyInvocationHandler handler;
- private final TechnicalAsyncService service;
- private final T proxyProcessor;
- private String location;
- private String preferHeader;
-
- private static class MyInvocationHandler implements InvocationHandler {
+ private final ProcessorInvocationHandler handler;
+ private final TechnicalAsyncService service;
+ private final T proxyProcessor;
+ private String location;
+ private String preferHeader;
+
+ /**
+ * InvocationHandler which is used as proxy for the Processor method.
+ */
+ private static class ProcessorInvocationHandler implements InvocationHandler {
private final Object wrappedInstance;
private Method invokeMethod;
private Object[] invokeParameters;
+ private ODataResponse processResponse;
- public MyInvocationHandler(Object wrappedInstance) {
+ public ProcessorInvocationHandler(Object wrappedInstance) {
this.wrappedInstance = wrappedInstance;
}
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
- if(Processor.class.isAssignableFrom(method.getDeclaringClass())) {
+ if (Processor.class.isAssignableFrom(method.getDeclaringClass())) {
invokeMethod = method;
invokeParameters = objects;
}
@@ -69,8 +79,6 @@ public class AsyncProcessor<T extends Processor> {
return null;
}
- ODataResponse processResponse;
-
public Object process() throws InvocationTargetException, IllegalAccessException {
processResponse = new ODataResponse();
replaceInvokeParameter(processResponse);
@@ -82,7 +90,7 @@ public class AsyncProcessor<T extends Processor> {
}
public <P> void replaceInvokeParameter(P replacement) {
- if(replacement == null) {
+ if (replacement == null) {
return;
}
@@ -97,81 +105,107 @@ public class AsyncProcessor<T extends Processor> {
invokeParameters = copy.toArray();
}
-
public ODataResponse getProcessResponse() {
return processResponse;
}
+
+ Object getWrappedInstance() {
+ return this.wrappedInstance;
+ }
}
public AsyncProcessor(T processor, Class<T> processorInterface, TechnicalAsyncService service) {
- Class<? extends Processor> aClass = processor.getClass();
- Class[] interfaces = aClass.getInterfaces();
- handler = new MyInvocationHandler(processor);
- Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler);
- proxyProcessor = processorInterface.cast(proxyInstance);
- this.service = service;
- }
+ Class<? extends Processor> aClass = processor.getClass();
+ Class[] interfaces = aClass.getInterfaces();
+ handler = new ProcessorInvocationHandler(processor);
+ Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler);
+ proxyProcessor = processorInterface.cast(proxyInstance);
+ this.service = service;
+ }
- public T prepareFor() {
- return proxyProcessor;
- }
+ public T prepareFor() {
+ return proxyProcessor;
+ }
- public ODataRequest getRequest() {
- return getParameter(ODataRequest.class);
- }
+ public ODataRequest getRequest() {
+ return getParameter(ODataRequest.class);
+ }
- public ODataResponse getResponse() {
- return getParameter(ODataResponse.class);
- }
+ public ODataResponse getResponse() {
+ return getParameter(ODataResponse.class);
+ }
- public ODataResponse getProcessResponse() {
- return handler.getProcessResponse();
- }
+ public ODataResponse getProcessResponse() {
+ return handler.getProcessResponse();
+ }
- private <P> P getParameter(Class<P> parameterClass) {
- for (Object parameter : handler.getInvokeParameters()) {
- if (parameter != null && parameterClass == parameter.getClass()) {
- return parameterClass.cast(parameter);
- }
+ private <P> P getParameter(Class<P> parameterClass) {
+ for (Object parameter : handler.getInvokeParameters()) {
+ if (parameter != null && parameterClass == parameter.getClass()) {
+ return parameterClass.cast(parameter);
}
- return null;
}
+ return null;
+ }
- public String processAsync() throws ODataApplicationException, ODataLibraryException {
- preferHeader = getRequest().getHeader(HttpHeader.PREFER);
- ODataRequest request = copyRequest(getRequest());
- handler.replaceInvokeParameter(request);
- handler.replaceInvokeParameter(new ODataResponse());
- return service.processAsynchronous(this);
- }
+ public String getPreferHeader() {
+ return preferHeader;
+ }
- Object process() throws InvocationTargetException, IllegalAccessException {
- return handler.process();
- }
+ public String getLocation() {
+ return location;
+ }
- private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException {
- ODataRequest req = new ODataRequest();
- req.setBody(copyRequestBody(request));
- req.setMethod(request.getMethod());
- req.setRawBaseUri(request.getRawBaseUri());
- req.setRawODataPath(request.getRawODataPath());
- req.setRawQueryPath(request.getRawQueryPath());
- req.setRawRequestUri(request.getRawRequestUri());
- req.setRawServiceResolutionUri(request.getRawServiceResolutionUri());
-
- for (Map.Entry<String, List<String>> header : request.getAllHeaders().entrySet()) {
- if(HttpHeader.PREFER.toLowerCase().equals(
- header.getKey().toLowerCase())) {
- preferHeader = header.getValue().get(0);
- } else {
- req.addHeader(header.getKey(), header.getValue());
- }
- }
+ public Class<?> getProcessorClass() {
+ return handler.getWrappedInstance().getClass();
+ }
+
+ /**
+ * Start the asynchronous processing and returns the id for this process
+ *
+ * @return the id for this process
+ * @throws ODataApplicationException
+ * @throws ODataLibraryException
+ */
+ public String processAsync() throws ODataApplicationException, ODataLibraryException {
+ preferHeader = getRequest().getHeader(HttpHeader.PREFER);
+ ODataRequest request = copyRequest(getRequest());
+ handler.replaceInvokeParameter(request);
+ handler.replaceInvokeParameter(new ODataResponse());
+ return service.processAsynchronous(this);
+ }
+
+ Object process() throws InvocationTargetException, IllegalAccessException {
+ return handler.process();
+ }
+
+ void setLocation(String loc) {
+ this.location = loc;
+ }
- return req;
+ private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException {
+ ODataRequest req = new ODataRequest();
+ req.setBody(copyRequestBody(request));
+ req.setMethod(request.getMethod());
+ req.setRawBaseUri(request.getRawBaseUri());
+ req.setRawODataPath(request.getRawODataPath());
+ req.setRawQueryPath(request.getRawQueryPath());
+ req.setRawRequestUri(request.getRawRequestUri());
+ req.setRawServiceResolutionUri(request.getRawServiceResolutionUri());
+
+ for (Map.Entry<String, List<String>> header : request.getAllHeaders().entrySet()) {
+ if (HttpHeader.PREFER.toLowerCase().equals(
+ header.getKey().toLowerCase())) {
+ preferHeader = header.getValue().get(0);
+ } else {
+ req.addHeader(header.getKey(), header.getValue());
+ }
}
+ return req;
+ }
+
private InputStream copyRequestBody(ODataRequest request) throws ODataApplicationException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
InputStream input = request.getBody();
@@ -193,16 +227,4 @@ public class AsyncProcessor<T extends Processor> {
}
return null;
}
-
- public String getPreferHeader() {
- return preferHeader;
- }
-
- public String getLocation() {
- return location;
- }
-
- void setLocation(String loc) {
- this.location = loc;
- }
- }
+}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/8f6ceeae/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
index 6a1cede..a7cfedf 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
@@ -18,20 +18,6 @@
*/
package org.apache.olingo.server.tecsvc.async;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.olingo.commons.api.ODataRuntimeException;
import org.apache.olingo.commons.api.format.ContentType;
import org.apache.olingo.commons.api.http.HttpHeader;
@@ -46,17 +32,57 @@ import org.apache.olingo.server.api.serializer.SerializerException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+/**
+ * The TechnicalAsyncService provides asynchronous support for any Processor.
+ * To use it following steps are necessary:
+ * <ul>
+ * <li>Get the instance</li>
+ * <li>Create an instance of the Processor which should be wrapped for asynchronous support
+ * (do not forget to call the <code>init(...)</code> method on the processor)</li>
+ * <li>register the Processor instance via the <code>register(...)</code> method</li>
+ * <li>prepare the corresponding method with the request parameters via the
+ * <code>prepareFor()</code> method at the AsyncProcessor</li>
+ * <li>start the async processing via the <code>processAsync()</code> methods</li>
+ * </ul>
+ * A short code snippet is shown below:
+ * <p>
+ * <code>
+ * TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
+ * TechnicalEntityProcessor processor = new TechnicalEntityProcessor(dataProvider, serviceMetadata);
+ * processor.init(odata, serviceMetadata);
+ * AsyncProcessor<EntityProcessor> asyncProcessor = asyncService.register(processor, EntityProcessor.class);
+ * asyncProcessor.prepareFor().readEntity(request, response, uriInfo, requestedFormat);
+ * String location = asyncProcessor.processAsync();
+ * </code>
+ * </p>
+ */
public class TechnicalAsyncService {
public static final String TEC_ASYNC_SLEEP = "tec.sleep";
+ public static final String STATUS_MONITOR_TOKEN = "status";
private static final Map<String, AsyncRunner> LOCATION_2_ASYNC_RUNNER =
Collections.synchronizedMap(new HashMap<String, AsyncRunner>());
private static final ExecutorService ASYNC_REQUEST_EXECUTOR = Executors.newFixedThreadPool(10);
private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
- public static final String STATUS_MONITOR_TOKEN = "status";
-
public <T extends Processor> AsyncProcessor<T> register(T processor, Class<T> processorInterface) {
return new AsyncProcessor<T>(processor, processorInterface, this);
@@ -73,7 +99,6 @@ public class TechnicalAsyncService {
updateHeader(response, HttpStatusCode.ACCEPTED, location);
}
-
private static final class AsyncProcessorHolder {
private static final TechnicalAsyncService INSTANCE = new TechnicalAsyncService();
}
@@ -116,12 +141,27 @@ public class TechnicalAsyncService {
} else {
response.setStatus(HttpStatusCode.ACCEPTED.getStatusCode());
response.setHeader(HttpHeader.LOCATION, location);
- String content = "In progress for async location = " + location;
- writeToResponse(response, content);
}
}
}
+ public void listQueue(HttpServletResponse response) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("<html><header/><body><h1>Queued requests</h1><ul>");
+ for (Map.Entry<String, AsyncRunner> entry : LOCATION_2_ASYNC_RUNNER.entrySet()) {
+ AsyncProcessor asyncProcessor = entry.getValue().getDispatched();
+ sb.append("<li><b>ID: </b>").append(entry.getKey()).append("<br/>")
+ .append("<b>Location: </b>").append(asyncProcessor.getLocation()).append("<br/>")
+ .append("<b>Processor: </b>").append(asyncProcessor.getProcessorClass().getSimpleName()).append("<br/>")
+ .append("<b>Finished: </b>").append(entry.getValue().isFinished()).append("<br/>")
+ .append("</li>");
+ }
+ sb.append("</ul></body></html>");
+
+ writeToResponse(response, sb.toString());
+ }
+
+
private static void writeToResponse(HttpServletResponse response, InputStream input) throws IOException {
copy(input, response.getOutputStream());
}
@@ -154,36 +194,47 @@ public class TechnicalAsyncService {
writeToResponse(response, odResponseStream);
}
- static void writeHttpResponse(final ODataResponse odResponse, final HttpServletResponse response) throws IOException {
- response.setStatus(odResponse.getStatusCode());
-
- for (Map.Entry<String, String> entry : odResponse.getHeaders().entrySet()) {
- response.setHeader(entry.getKey(), entry.getValue());
- }
-
- copy(odResponse.getContent(), response.getOutputStream());
- }
+// static void copy(final InputStream input, final OutputStream output) {
+// if(output == null || input == null) {
+// return;
+// }
+//
+// try {
+// byte[] buffer = new byte[1024];
+// int n;
+// while (-1 != (n = input.read(buffer))) {
+// output.write(buffer, 0, n);
+// }
+// } catch (IOException e) {
+// throw new ODataRuntimeException(e);
+// } finally {
+// closeStream(output);
+// closeStream(input);
+// }
+// }
static void copy(final InputStream input, final OutputStream output) {
- if(output == null || input == null) {
+ if (output == null || input == null) {
return;
}
try {
- byte[] buffer = new byte[1024];
- int n;
- while (-1 != (n = input.read(buffer))) {
- output.write(buffer, 0, n);
+ ByteBuffer inBuffer = ByteBuffer.allocate(8192);
+ ReadableByteChannel ic = Channels.newChannel(input);
+ WritableByteChannel oc = Channels.newChannel(output);
+ while (ic.read(inBuffer) > 0) {
+ inBuffer.flip();
+ oc.write(inBuffer);
+ inBuffer.rewind();
}
} catch (IOException e) {
- throw new ODataRuntimeException(e);
+ throw new ODataRuntimeException("Error on reading request content");
} finally {
- closeStream(output);
closeStream(input);
+ closeStream(output);
}
}
-
private static void closeStream(final Closeable closeable) {
if (closeable != null) {
try {
@@ -194,7 +245,6 @@ public class TechnicalAsyncService {
}
}
-
private String createNewAsyncLocation(ODataRequest request) {
int pos = request.getRawBaseUri().lastIndexOf("/") + 1;
return request.getRawBaseUri().substring(0, pos) + STATUS_MONITOR_TOKEN + "/" + ID_GENERATOR.incrementAndGet();
@@ -204,6 +254,9 @@ public class TechnicalAsyncService {
return request.getRequestURL().toString();
}
+ /**
+ * Runnable for the AsyncProcessor.
+ */
private static class AsyncRunner implements Runnable {
private final AsyncProcessor dispatched;
private int defaultSleepTimeInSeconds = 0;
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/8f6ceeae/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
index 8fd22cf..6ed3722 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
@@ -18,33 +18,14 @@
*/
package org.apache.olingo.server.tecsvc.async;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
-
-import org.apache.olingo.server.api.OData;
-import org.apache.olingo.server.api.ODataHttpHandler;
-import org.apache.olingo.server.api.ServiceMetadata;
-import org.apache.olingo.server.api.edmx.EdmxReference;
-import org.apache.olingo.server.api.edmx.EdmxReferenceInclude;
-import org.apache.olingo.server.tecsvc.ETagSupport;
-import org.apache.olingo.server.tecsvc.MetadataETagSupport;
-import org.apache.olingo.server.tecsvc.data.DataProvider;
-import org.apache.olingo.server.tecsvc.processor.TechnicalActionProcessor;
-import org.apache.olingo.server.tecsvc.processor.TechnicalBatchProcessor;
-import org.apache.olingo.server.tecsvc.processor.TechnicalEntityProcessor;
-import org.apache.olingo.server.tecsvc.processor.TechnicalPrimitiveComplexProcessor;
-import org.apache.olingo.server.tecsvc.provider.EdmTechProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
public class TechnicalStatusMonitorServlet extends HttpServlet {
@@ -55,8 +36,10 @@ public class TechnicalStatusMonitorServlet extends HttpServlet {
protected void service(final HttpServletRequest request, final HttpServletResponse response)
throws ServletException, IOException {
try {
- if(TechnicalAsyncService.getInstance().isStatusMonitorResource(request)) {
- TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
+ TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
+ if("/list".equals(request.getPathInfo())) {
+ asyncService.listQueue(response);
+ } else if(asyncService.isStatusMonitorResource(request)) {
asyncService.handle(request, response);
}
} catch (final Exception e) {