You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@olingo.apache.org by mi...@apache.org on 2015/07/08 20:57:21 UTC

olingo-odata4 git commit: [OLINGO-708] Minor code clean up in TecAsyncSvc

Repository: olingo-odata4
Updated Branches:
  refs/heads/OLINGO-708_AsyncSupportTec 31e3a8bd0 -> 8f6ceeae1


[OLINGO-708] Minor code clean up in TecAsyncSvc


Project: http://git-wip-us.apache.org/repos/asf/olingo-odata4/repo
Commit: http://git-wip-us.apache.org/repos/asf/olingo-odata4/commit/8f6ceeae
Tree: http://git-wip-us.apache.org/repos/asf/olingo-odata4/tree/8f6ceeae
Diff: http://git-wip-us.apache.org/repos/asf/olingo-odata4/diff/8f6ceeae

Branch: refs/heads/OLINGO-708_AsyncSupportTec
Commit: 8f6ceeae16733798cd27ae2384ed58780e2424d0
Parents: 31e3a8b
Author: mibo <mi...@apache.org>
Authored: Wed Jul 8 20:57:03 2015 +0200
Committer: mibo <mi...@apache.org>
Committed: Wed Jul 8 20:57:03 2015 +0200

----------------------------------------------------------------------
 .../server/tecsvc/async/AsyncProcessor.java     | 198 ++++++++++---------
 .../tecsvc/async/TechnicalAsyncService.java     | 127 ++++++++----
 .../async/TechnicalStatusMonitorServlet.java    |  31 +--
 3 files changed, 207 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/8f6ceeae/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
index 99b81d6..e6d662e 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/AsyncProcessor.java
@@ -18,6 +18,14 @@
  */
 package org.apache.olingo.server.tecsvc.async;
 
+import org.apache.olingo.commons.api.http.HttpHeader;
+import org.apache.olingo.commons.api.http.HttpStatusCode;
+import org.apache.olingo.server.api.ODataApplicationException;
+import org.apache.olingo.server.api.ODataLibraryException;
+import org.apache.olingo.server.api.ODataRequest;
+import org.apache.olingo.server.api.ODataResponse;
+import org.apache.olingo.server.api.processor.Processor;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,33 +43,35 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
-import org.apache.olingo.commons.api.http.HttpHeader;
-import org.apache.olingo.commons.api.http.HttpStatusCode;
-import org.apache.olingo.server.api.ODataApplicationException;
-import org.apache.olingo.server.api.ODataLibraryException;
-import org.apache.olingo.server.api.ODataRequest;
-import org.apache.olingo.server.api.ODataResponse;
-import org.apache.olingo.server.api.processor.Processor;
-
+/**
+ * Async processor "wraps" an Processor (or subclass of) to provide asynchronous support functionality
+ * in combination with the TechnicalAsyncService.
+ *
+ * @param <T> "wrapped" Processor
+ */
 public class AsyncProcessor<T extends Processor> {
-    private final MyInvocationHandler handler;
-    private final TechnicalAsyncService service;
-    private final T proxyProcessor;
-    private String location;
-    private String preferHeader;
-
-  private static class MyInvocationHandler implements InvocationHandler {
+  private final ProcessorInvocationHandler handler;
+  private final TechnicalAsyncService service;
+  private final T proxyProcessor;
+  private String location;
+  private String preferHeader;
+
+  /**
+   * InvocationHandler which is used as proxy for the Processor method.
+   */
+  private static class ProcessorInvocationHandler implements InvocationHandler {
     private final Object wrappedInstance;
     private Method invokeMethod;
     private Object[] invokeParameters;
+    private ODataResponse processResponse;
 
-    public MyInvocationHandler(Object wrappedInstance) {
+    public ProcessorInvocationHandler(Object wrappedInstance) {
       this.wrappedInstance = wrappedInstance;
     }
 
     @Override
     public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
-      if(Processor.class.isAssignableFrom(method.getDeclaringClass())) {
+      if (Processor.class.isAssignableFrom(method.getDeclaringClass())) {
         invokeMethod = method;
         invokeParameters = objects;
       }
@@ -69,8 +79,6 @@ public class AsyncProcessor<T extends Processor> {
       return null;
     }
 
-    ODataResponse processResponse;
-
     public Object process() throws InvocationTargetException, IllegalAccessException {
       processResponse = new ODataResponse();
       replaceInvokeParameter(processResponse);
@@ -82,7 +90,7 @@ public class AsyncProcessor<T extends Processor> {
     }
 
     public <P> void replaceInvokeParameter(P replacement) {
-      if(replacement == null) {
+      if (replacement == null) {
         return;
       }
 
@@ -97,81 +105,107 @@ public class AsyncProcessor<T extends Processor> {
       invokeParameters = copy.toArray();
     }
 
-
     public ODataResponse getProcessResponse() {
       return processResponse;
     }
+
+    Object getWrappedInstance() {
+      return this.wrappedInstance;
+    }
   }
 
 
   public AsyncProcessor(T processor, Class<T> processorInterface, TechnicalAsyncService service) {
-      Class<? extends Processor> aClass = processor.getClass();
-      Class[] interfaces = aClass.getInterfaces();
-      handler = new MyInvocationHandler(processor);
-      Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler);
-      proxyProcessor = processorInterface.cast(proxyInstance);
-      this.service = service;
-    }
+    Class<? extends Processor> aClass = processor.getClass();
+    Class[] interfaces = aClass.getInterfaces();
+    handler = new ProcessorInvocationHandler(processor);
+    Object proxyInstance = Proxy.newProxyInstance(aClass.getClassLoader(), interfaces, handler);
+    proxyProcessor = processorInterface.cast(proxyInstance);
+    this.service = service;
+  }
 
-    public T prepareFor() {
-      return proxyProcessor;
-    }
+  public T prepareFor() {
+    return proxyProcessor;
+  }
 
-    public ODataRequest getRequest() {
-      return getParameter(ODataRequest.class);
-    }
+  public ODataRequest getRequest() {
+    return getParameter(ODataRequest.class);
+  }
 
-    public ODataResponse getResponse() {
-      return getParameter(ODataResponse.class);
-    }
+  public ODataResponse getResponse() {
+    return getParameter(ODataResponse.class);
+  }
 
-    public ODataResponse getProcessResponse() {
-      return handler.getProcessResponse();
-    }
+  public ODataResponse getProcessResponse() {
+    return handler.getProcessResponse();
+  }
 
-    private <P> P getParameter(Class<P> parameterClass) {
-      for (Object parameter : handler.getInvokeParameters()) {
-        if (parameter != null && parameterClass == parameter.getClass()) {
-          return parameterClass.cast(parameter);
-        }
+  private <P> P getParameter(Class<P> parameterClass) {
+    for (Object parameter : handler.getInvokeParameters()) {
+      if (parameter != null && parameterClass == parameter.getClass()) {
+        return parameterClass.cast(parameter);
       }
-      return null;
     }
+    return null;
+  }
 
-    public String processAsync() throws ODataApplicationException, ODataLibraryException {
-      preferHeader = getRequest().getHeader(HttpHeader.PREFER);
-      ODataRequest request = copyRequest(getRequest());
-      handler.replaceInvokeParameter(request);
-      handler.replaceInvokeParameter(new ODataResponse());
-      return service.processAsynchronous(this);
-    }
+  public String getPreferHeader() {
+    return preferHeader;
+  }
 
-    Object process() throws InvocationTargetException, IllegalAccessException {
-      return handler.process();
-    }
+  public String getLocation() {
+    return location;
+  }
 
-    private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException {
-      ODataRequest req = new ODataRequest();
-      req.setBody(copyRequestBody(request));
-      req.setMethod(request.getMethod());
-      req.setRawBaseUri(request.getRawBaseUri());
-      req.setRawODataPath(request.getRawODataPath());
-      req.setRawQueryPath(request.getRawQueryPath());
-      req.setRawRequestUri(request.getRawRequestUri());
-      req.setRawServiceResolutionUri(request.getRawServiceResolutionUri());
-
-      for (Map.Entry<String, List<String>> header : request.getAllHeaders().entrySet()) {
-        if(HttpHeader.PREFER.toLowerCase().equals(
-                header.getKey().toLowerCase())) {
-          preferHeader = header.getValue().get(0);
-        } else {
-          req.addHeader(header.getKey(), header.getValue());
-        }
-      }
+  public Class<?> getProcessorClass() {
+    return handler.getWrappedInstance().getClass();
+  }
+
+  /**
+   * Start the asynchronous processing and returns the id for this process
+   *
+   * @return the id for this process
+   * @throws ODataApplicationException
+   * @throws ODataLibraryException
+   */
+  public String processAsync() throws ODataApplicationException, ODataLibraryException {
+    preferHeader = getRequest().getHeader(HttpHeader.PREFER);
+    ODataRequest request = copyRequest(getRequest());
+    handler.replaceInvokeParameter(request);
+    handler.replaceInvokeParameter(new ODataResponse());
+    return service.processAsynchronous(this);
+  }
+
+  Object process() throws InvocationTargetException, IllegalAccessException {
+    return handler.process();
+  }
+
+  void setLocation(String loc) {
+    this.location = loc;
+  }
 
-      return req;
+  private ODataRequest copyRequest(ODataRequest request) throws ODataApplicationException {
+    ODataRequest req = new ODataRequest();
+    req.setBody(copyRequestBody(request));
+    req.setMethod(request.getMethod());
+    req.setRawBaseUri(request.getRawBaseUri());
+    req.setRawODataPath(request.getRawODataPath());
+    req.setRawQueryPath(request.getRawQueryPath());
+    req.setRawRequestUri(request.getRawRequestUri());
+    req.setRawServiceResolutionUri(request.getRawServiceResolutionUri());
+
+    for (Map.Entry<String, List<String>> header : request.getAllHeaders().entrySet()) {
+      if (HttpHeader.PREFER.toLowerCase().equals(
+          header.getKey().toLowerCase())) {
+        preferHeader = header.getValue().get(0);
+      } else {
+        req.addHeader(header.getKey(), header.getValue());
+      }
     }
 
+    return req;
+  }
+
   private InputStream copyRequestBody(ODataRequest request) throws ODataApplicationException {
     ByteArrayOutputStream buffer = new ByteArrayOutputStream();
     InputStream input = request.getBody();
@@ -193,16 +227,4 @@ public class AsyncProcessor<T extends Processor> {
     }
     return null;
   }
-
-  public String getPreferHeader() {
-      return preferHeader;
-    }
-
-    public String getLocation() {
-      return location;
-    }
-
-    void setLocation(String loc) {
-      this.location = loc;
-    }
-  }
+}

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/8f6ceeae/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
index 6a1cede..a7cfedf 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalAsyncService.java
@@ -18,20 +18,6 @@
  */
 package org.apache.olingo.server.tecsvc.async;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.olingo.commons.api.ODataRuntimeException;
 import org.apache.olingo.commons.api.format.ContentType;
 import org.apache.olingo.commons.api.http.HttpHeader;
@@ -46,17 +32,57 @@ import org.apache.olingo.server.api.serializer.SerializerException;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+/**
+ * The TechnicalAsyncService provides asynchronous support for any Processor.
+ * To use it following steps are necessary:
+ * <ul>
+ *   <li>Get the instance</li>
+ *   <li>Create an instance of the Processor which should be wrapped for asynchronous support
+ *   (do not forget to call the <code>init(...)</code> method on the processor)</li>
+ *   <li>register the Processor instance via the <code>register(...)</code> method</li>
+ *   <li>prepare the corresponding method with the request parameters via the
+ *   <code>prepareFor()</code> method at the AsyncProcessor</li>
+ *   <li>start the async processing via the <code>processAsync()</code> methods</li>
+ * </ul>
+ * A short code snippet is shown below:
+ * <p>
+ * <code>
+ * TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
+ * TechnicalEntityProcessor processor = new TechnicalEntityProcessor(dataProvider, serviceMetadata);
+ * processor.init(odata, serviceMetadata);
+ * AsyncProcessor<EntityProcessor> asyncProcessor = asyncService.register(processor, EntityProcessor.class);
+ * asyncProcessor.prepareFor().readEntity(request, response, uriInfo, requestedFormat);
+ * String location = asyncProcessor.processAsync();
+ * </code>
+ * </p>
+ */
 public class TechnicalAsyncService {
 
   public static final String TEC_ASYNC_SLEEP = "tec.sleep";
+  public static final String STATUS_MONITOR_TOKEN = "status";
 
   private static final Map<String, AsyncRunner> LOCATION_2_ASYNC_RUNNER =
       Collections.synchronizedMap(new HashMap<String, AsyncRunner>());
   private static final ExecutorService ASYNC_REQUEST_EXECUTOR = Executors.newFixedThreadPool(10);
   private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
-  public static final String STATUS_MONITOR_TOKEN = "status";
-
 
   public <T extends Processor> AsyncProcessor<T> register(T processor, Class<T> processorInterface) {
     return new AsyncProcessor<T>(processor, processorInterface, this);
@@ -73,7 +99,6 @@ public class TechnicalAsyncService {
     updateHeader(response, HttpStatusCode.ACCEPTED, location);
   }
 
-
   private static final class AsyncProcessorHolder {
     private static final TechnicalAsyncService INSTANCE = new TechnicalAsyncService();
   }
@@ -116,12 +141,27 @@ public class TechnicalAsyncService {
       } else {
         response.setStatus(HttpStatusCode.ACCEPTED.getStatusCode());
         response.setHeader(HttpHeader.LOCATION, location);
-        String content = "In progress for async location = " + location;
-        writeToResponse(response, content);
       }
     }
   }
 
+  public void listQueue(HttpServletResponse response) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("<html><header/><body><h1>Queued requests</h1><ul>");
+    for (Map.Entry<String, AsyncRunner> entry : LOCATION_2_ASYNC_RUNNER.entrySet()) {
+      AsyncProcessor asyncProcessor = entry.getValue().getDispatched();
+      sb.append("<li><b>ID: </b>").append(entry.getKey()).append("<br/>")
+          .append("<b>Location: </b>").append(asyncProcessor.getLocation()).append("<br/>")
+          .append("<b>Processor: </b>").append(asyncProcessor.getProcessorClass().getSimpleName()).append("<br/>")
+          .append("<b>Finished: </b>").append(entry.getValue().isFinished()).append("<br/>")
+          .append("</li>");
+    }
+    sb.append("</ul></body></html>");
+
+    writeToResponse(response, sb.toString());
+  }
+
+
   private static void writeToResponse(HttpServletResponse response, InputStream input) throws IOException {
     copy(input, response.getOutputStream());
   }
@@ -154,36 +194,47 @@ public class TechnicalAsyncService {
     writeToResponse(response, odResponseStream);
   }
 
-  static void writeHttpResponse(final ODataResponse odResponse, final HttpServletResponse response) throws IOException {
-    response.setStatus(odResponse.getStatusCode());
-
-    for (Map.Entry<String, String> entry : odResponse.getHeaders().entrySet()) {
-      response.setHeader(entry.getKey(), entry.getValue());
-    }
-
-    copy(odResponse.getContent(), response.getOutputStream());
-  }
+//  static void copy(final InputStream input, final OutputStream output) {
+//    if(output == null || input == null) {
+//      return;
+//    }
+//
+//    try {
+//      byte[] buffer = new byte[1024];
+//      int n;
+//      while (-1 != (n = input.read(buffer))) {
+//        output.write(buffer, 0, n);
+//      }
+//    } catch (IOException e) {
+//      throw new ODataRuntimeException(e);
+//    } finally {
+//      closeStream(output);
+//      closeStream(input);
+//    }
+//  }
 
   static void copy(final InputStream input, final OutputStream output) {
-    if(output == null || input == null) {
+    if (output == null || input == null) {
       return;
     }
 
     try {
-      byte[] buffer = new byte[1024];
-      int n;
-      while (-1 != (n = input.read(buffer))) {
-        output.write(buffer, 0, n);
+      ByteBuffer inBuffer = ByteBuffer.allocate(8192);
+      ReadableByteChannel ic = Channels.newChannel(input);
+      WritableByteChannel oc = Channels.newChannel(output);
+      while (ic.read(inBuffer) > 0) {
+        inBuffer.flip();
+        oc.write(inBuffer);
+        inBuffer.rewind();
       }
     } catch (IOException e) {
-      throw new ODataRuntimeException(e);
+      throw new ODataRuntimeException("Error on reading request content");
     } finally {
-      closeStream(output);
       closeStream(input);
+      closeStream(output);
     }
   }
 
-
   private static void closeStream(final Closeable closeable) {
     if (closeable != null) {
       try {
@@ -194,7 +245,6 @@ public class TechnicalAsyncService {
     }
   }
 
-
   private String createNewAsyncLocation(ODataRequest request) {
     int pos = request.getRawBaseUri().lastIndexOf("/") + 1;
     return request.getRawBaseUri().substring(0, pos) + STATUS_MONITOR_TOKEN + "/" + ID_GENERATOR.incrementAndGet();
@@ -204,6 +254,9 @@ public class TechnicalAsyncService {
     return request.getRequestURL().toString();
   }
 
+  /**
+   * Runnable for the AsyncProcessor.
+   */
   private static class AsyncRunner implements Runnable {
     private final AsyncProcessor dispatched;
     private int defaultSleepTimeInSeconds = 0;

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/8f6ceeae/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
----------------------------------------------------------------------
diff --git a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
index 8fd22cf..6ed3722 100644
--- a/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
+++ b/lib/server-tecsvc/src/main/java/org/apache/olingo/server/tecsvc/async/TechnicalStatusMonitorServlet.java
@@ -18,33 +18,14 @@
  */
 package org.apache.olingo.server.tecsvc.async;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
-
-import org.apache.olingo.server.api.OData;
-import org.apache.olingo.server.api.ODataHttpHandler;
-import org.apache.olingo.server.api.ServiceMetadata;
-import org.apache.olingo.server.api.edmx.EdmxReference;
-import org.apache.olingo.server.api.edmx.EdmxReferenceInclude;
-import org.apache.olingo.server.tecsvc.ETagSupport;
-import org.apache.olingo.server.tecsvc.MetadataETagSupport;
-import org.apache.olingo.server.tecsvc.data.DataProvider;
-import org.apache.olingo.server.tecsvc.processor.TechnicalActionProcessor;
-import org.apache.olingo.server.tecsvc.processor.TechnicalBatchProcessor;
-import org.apache.olingo.server.tecsvc.processor.TechnicalEntityProcessor;
-import org.apache.olingo.server.tecsvc.processor.TechnicalPrimitiveComplexProcessor;
-import org.apache.olingo.server.tecsvc.provider.EdmTechProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
 
 public class TechnicalStatusMonitorServlet extends HttpServlet {
 
@@ -55,8 +36,10 @@ public class TechnicalStatusMonitorServlet extends HttpServlet {
   protected void service(final HttpServletRequest request, final HttpServletResponse response)
       throws ServletException, IOException {
     try {
-      if(TechnicalAsyncService.getInstance().isStatusMonitorResource(request)) {
-        TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
+      TechnicalAsyncService asyncService = TechnicalAsyncService.getInstance();
+      if("/list".equals(request.getPathInfo())) {
+        asyncService.listQueue(response);
+      } else if(asyncService.isStatusMonitorResource(request)) {
         asyncService.handle(request, response);
       }
     } catch (final Exception e) {