You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@olingo.apache.org by fm...@apache.org on 2014/04/28 13:10:57 UTC

git commit: [OLINGO-246, OLINGO-248] provided async batch support

Repository: olingo-odata4
Updated Branches:
  refs/heads/master 15e7718a0 -> 0b05798cd


[OLINGO-246, OLINGO-248] provided async batch support


Project: http://git-wip-us.apache.org/repos/asf/olingo-odata4/repo
Commit: http://git-wip-us.apache.org/repos/asf/olingo-odata4/commit/0b05798c
Tree: http://git-wip-us.apache.org/repos/asf/olingo-odata4/tree/0b05798c
Diff: http://git-wip-us.apache.org/repos/asf/olingo-odata4/diff/0b05798c

Branch: refs/heads/master
Commit: 0b05798cd943529bf0099cb05760f2baf1c6cf9b
Parents: 15e7718
Author: fmartelli <fa...@gmail.com>
Authored: Mon Apr 28 13:10:40 2014 +0200
Committer: fmartelli <fa...@gmail.com>
Committed: Mon Apr 28 13:10:40 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/olingo/fit/V4Services.java  |  71 +++++++++--
 .../request/v4/AsyncBatchRequestWrapper.java    |  48 ++++++++
 .../request/v4/AsyncRequestFactory.java         |   3 +
 .../response/v4/AsyncResponseWrapper.java       |  11 ++
 .../batch/ODataChangesetResponseItem.java       |  13 +-
 .../batch/ODataRetrieveResponseItem.java        |   7 +-
 .../request/batch/v4/ODataBatchRequestImpl.java |   3 +
 .../v4/AsyncBatchRequestWrapperImpl.java        | 119 +++++++++++++++++++
 .../request/v4/AsyncRequestFactoryImpl.java     |   7 ++
 .../request/v4/AsyncRequestWrapperImpl.java     |  70 ++++++-----
 .../response/AbstractODataResponse.java         |   6 +-
 .../response/v4/AsyncResponseImpl.java          |  27 +++++
 .../client/core/it/v4/BatchTestITCase.java      |  84 ++++++++++++-
 13 files changed, 415 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/fit/src/main/java/org/apache/olingo/fit/V4Services.java
----------------------------------------------------------------------
diff --git a/fit/src/main/java/org/apache/olingo/fit/V4Services.java b/fit/src/main/java/org/apache/olingo/fit/V4Services.java
index 25eac89..38d8696 100644
--- a/fit/src/main/java/org/apache/olingo/fit/V4Services.java
+++ b/fit/src/main/java/org/apache/olingo/fit/V4Services.java
@@ -51,12 +51,15 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.cxf.interceptor.InInterceptors;
 import org.apache.cxf.jaxrs.ext.multipart.Attachment;
+import org.apache.cxf.jaxrs.ext.multipart.Multipart;
+import org.apache.cxf.jaxrs.ext.multipart.MultipartBody;
 import org.apache.olingo.commons.api.data.CollectionValue;
 import org.apache.olingo.commons.api.data.ResWrap;
 import org.apache.olingo.commons.api.data.Entry;
 import org.apache.olingo.commons.api.data.Feed;
 import org.apache.olingo.commons.api.data.Property;
 import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
+import org.apache.olingo.commons.api.format.ContentType;
 import org.apache.olingo.commons.core.data.AtomEntryImpl;
 import org.apache.olingo.commons.core.data.AtomFeedImpl;
 import org.apache.olingo.commons.core.data.AtomPropertyImpl;
@@ -152,6 +155,54 @@ public class V4Services extends AbstractServices {
     }
   }
 
+  @POST
+  @Path("/async/$batch")
+  public Response async(
+          @Context final UriInfo uriInfo,
+          @HeaderParam("Prefer") @DefaultValue(StringUtils.EMPTY) final String prefer,
+          final @Multipart MultipartBody attachment) {
+
+    try {
+      final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      bos.write("HTTP/1.1 200 Ok".getBytes());
+      bos.write(CRLF);
+      bos.write("OData-Version: 4.0".getBytes());
+      bos.write(CRLF);
+      bos.write(("Content-Type: " + ContentType.APPLICATION_OCTET_STREAM + ";boundary=" + BOUNDARY).getBytes());
+      bos.write(CRLF);
+      bos.write(CRLF);
+
+      bos.write(("--" + BOUNDARY).getBytes());
+      bos.write(CRLF);
+      bos.write("Content-Type: application/http".getBytes());
+      bos.write(CRLF);
+      bos.write("Content-Transfer-Encoding: binary".getBytes());
+      bos.write(CRLF);
+      bos.write(CRLF);
+
+      bos.write("HTTP/1.1 202 Accepted".getBytes());
+      bos.write(CRLF);
+      bos.write("Location: http://service-root/async-monitor".getBytes());
+      bos.write(CRLF);
+      bos.write("Retry-After: 10".getBytes());
+      bos.write(CRLF);
+      bos.write(CRLF);
+      bos.write(("--" + BOUNDARY + "--").getBytes());
+      bos.write(CRLF);
+
+      final UUID uuid = UUID.randomUUID();
+      providedAsync.put(uuid.toString(), bos.toString(Constants.ENCODING.toString()));
+      
+      bos.flush();
+      bos.close();
+
+      return xml.createAsyncResponse(
+              uriInfo.getRequestUri().toASCIIString().replace("async/$batch", "") + "monitor/" + uuid.toString());
+    } catch (Exception e) {
+      return xml.createFaultResponse(Accept.JSON.toString(), e);
+    }
+  }
+
   @GET
   @Path("/async/{name}")
   public Response async(
@@ -568,7 +619,7 @@ public class V4Services extends AbstractServices {
 
       return utils.getValue().createResponse(
               FSManager.instance(version).readFile(Constants.get(version, ConstantKey.REF)
-                      + File.separatorChar + filename, utils.getKey()),
+              + File.separatorChar + filename, utils.getKey()),
               null,
               utils.getKey());
     } catch (Exception e) {
@@ -590,7 +641,7 @@ public class V4Services extends AbstractServices {
 
     final Response response =
             getEntityInternal(uriInfo.getRequestUri().toASCIIString(),
-                    accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false);
+            accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false);
     return response.getStatus() >= 400
             ? postNewEntity(uriInfo, accept, contentType, prefer, entitySetName, changes)
             : super.patchEntity(uriInfo, accept, contentType, prefer, ifMatch, entitySetName, entityId, changes);
@@ -688,8 +739,8 @@ public class V4Services extends AbstractServices {
       } else {
         final ResWrap<JSONEntryImpl> jcontainer =
                 mapper.readValue(IOUtils.toInputStream(entity, Constants.ENCODING),
-                        new TypeReference<JSONEntryImpl>() {
-                        });
+                new TypeReference<JSONEntryImpl>() {
+        });
 
         entry = dataBinder.toAtomEntry(jcontainer.getPayload());
 
@@ -787,7 +838,7 @@ public class V4Services extends AbstractServices {
 
         final ResWrap<JSONEntryImpl> jsonContainer = mapper.readValue(
                 IOUtils.toInputStream(changes, Constants.ENCODING), new TypeReference<JSONEntryImpl>() {
-                });
+        });
         jsonContainer.getPayload().setType(typeInfo.getFullQualifiedName().toString());
         entryChanges = dataBinder.toAtomEntry(jsonContainer.getPayload());
       }
@@ -820,7 +871,7 @@ public class V4Services extends AbstractServices {
       // 1. Fetch the contained entity to be removed
       final InputStream entry = FSManager.instance(version).
               readFile(containedPath(entityId, containedEntitySetName).
-                      append('(').append(containedEntityId).append(')').toString(), Accept.ATOM);
+              append('(').append(containedEntityId).append(')').toString(), Accept.ATOM);
       final ResWrap<AtomEntryImpl> container = atomDeserializer.read(entry, AtomEntryImpl.class);
 
       // 2. Remove the contained entity
@@ -1049,8 +1100,8 @@ public class V4Services extends AbstractServices {
       } else {
         final ResWrap<JSONPropertyImpl> paramContainer =
                 mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING),
-                        new TypeReference<JSONPropertyImpl>() {
-                        });
+                new TypeReference<JSONPropertyImpl>() {
+        });
         property = paramContainer.getPayload();
       }
 
@@ -1091,8 +1142,8 @@ public class V4Services extends AbstractServices {
       } else {
         final ResWrap<JSONPropertyImpl> paramContainer =
                 mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING),
-                        new TypeReference<JSONPropertyImpl>() {
-                        });
+                new TypeReference<JSONPropertyImpl>() {
+        });
         property = paramContainer.getPayload();
       }
 

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java
----------------------------------------------------------------------
diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java
new file mode 100644
index 0000000..9efa0fd
--- /dev/null
+++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.api.communication.request.v4;
+
+import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
+import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate;
+import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
+
+public interface AsyncBatchRequestWrapper extends AsyncRequestWrapper<ODataBatchResponse> {
+
+  /**
+   * Gets a changeset batch item instance. A changeset can be submitted embedded into a batch request only.
+   *
+   * @return ODataChangeset instance.
+   */
+  ODataChangeset addChangeset();
+
+  /**
+   * Gets a retrieve batch item instance. A retrieve item can be submitted embedded into a batch request only.
+   *
+   * @return ODataRetrieve instance.
+   */
+  ODataRetrieve addRetrieve();
+
+  /**
+   * Gets an outside change batch item instance. An outside item can be submitted embedded into a batch request only.
+   *
+   * @return ODataOutsideUpdate instance.
+   */
+  ODataOutsideUpdate addOutsideUpdate();
+}

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
----------------------------------------------------------------------
diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
index 9fd06dc..17b510f 100644
--- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
+++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
@@ -19,10 +19,13 @@
 package org.apache.olingo.client.api.communication.request.v4;
 
 import org.apache.olingo.client.api.communication.request.ODataRequest;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
 
 @SuppressWarnings("unchecked")
 public interface AsyncRequestFactory {
 
   <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest);
+
+  AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest);
 }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
----------------------------------------------------------------------
diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
index 87b6904..04d3688 100644
--- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
+++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.olingo.client.api.communication.response.v4;
 
+import java.net.URI;
 import org.apache.olingo.client.api.communication.response.ODataDeleteResponse;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
 
@@ -50,6 +51,16 @@ public interface AsyncResponseWrapper<R extends ODataResponse> {
   R getODataResponse();
 
   /**
+   * Specifies the location for the next monitor check.
+   * <br />
+   * Overrides the location value retrieved among headers and nullifies the previous valid response (if exists).
+   *
+   * @param uri monitor location.
+   * @return the current async response wrapper.
+   */
+  AsyncResponseWrapper<R> forceNextMonitorCheck(URI uri);
+
+  /**
    * DeleteA DELETE request sent to the status monitor resource requests that the asynchronous processing be canceled. A
    * 200 OK or to a 204 No Content response indicates that the asynchronous processing has been successfully canceled.
    *

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
index 53eeb16..51f3a48 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
@@ -25,6 +25,7 @@ import org.apache.olingo.client.api.ODataBatchConstants;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
 import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG;
 import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse;
+import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl;
 
 /**
  * Changeset wrapper for the corresponding batch item.
@@ -116,11 +117,15 @@ public class ODataChangesetResponseItem extends AbstractODataBatchResponseItem {
     final Map.Entry<Integer, String> responseLine = ODataBatchUtilities.readResponseLine(batchLineIterator);
     LOG.debug("Retrieved item response {}", responseLine);
 
-    if (responseLine.getKey() >= 400) {
-      // generate error response
-      final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
-      LOG.debug("Retrieved item headers {}", headers);
+    final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
+    LOG.debug("Retrieved item headers {}", headers);
 
+    if (responseLine.getKey() == 202) {
+      // generate async response
+      current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary);
+      return current;
+    } else if (responseLine.getKey() >= 400) {
+      // generate error response
       current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary);
       return current;
     }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
index 8919ffe..a180b37 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
@@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
 import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG;
 import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse;
+import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl;
 
 /**
  * Retrieve response wrapper for the corresponding batch item.
@@ -58,7 +59,11 @@ public class ODataRetrieveResponseItem extends AbstractODataBatchResponseItem {
     final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
     LOG.debug("Retrieved item headers {}", headers);
 
-    if (responseLine.getKey() >= 400) {
+    if (responseLine.getKey() == 202) {
+      // generate async response
+      current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary);
+      breakingitem = true;
+    } else if (responseLine.getKey() >= 400) {
       // generate error response
       current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary);
       breakingitem = true;

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
index 648cb35..a445013 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
@@ -122,6 +122,9 @@ public class ODataBatchRequestImpl
    */
   protected class ODataBatchResponseImpl extends AbstractODataResponse implements ODataBatchResponse {
 
+    private ODataBatchResponseImpl() {
+    }
+
     /**
      * Constructor.
      *

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java
new file mode 100644
index 0000000..8a3d903
--- /dev/null
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.core.communication.request.v4;
+
+import java.net.URI;
+import java.util.Collection;
+import org.apache.commons.io.IOUtils;
+import org.apache.olingo.client.api.communication.header.HeaderName;
+import org.apache.olingo.client.api.communication.header.ODataPreferences;
+import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
+import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve;
+import org.apache.olingo.client.api.communication.request.batch.v4.BatchStreamManager;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate;
+import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
+import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
+import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper;
+import org.apache.olingo.client.api.v4.ODataClient;
+import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
+
+public class AsyncBatchRequestWrapperImpl extends AsyncRequestWrapperImpl<ODataBatchResponse>
+        implements AsyncBatchRequestWrapper {
+
+  private BatchStreamManager batchStreamManager;
+
+  protected AsyncBatchRequestWrapperImpl(final ODataClient odataClient, final ODataBatchRequest odataRequest) {
+    super(odataClient, odataRequest);
+    batchStreamManager = odataRequest.execute();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ODataChangeset addChangeset() {
+    return batchStreamManager.addChangeset();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ODataRetrieve addRetrieve() {
+    return batchStreamManager.addRetrieve();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ODataOutsideUpdate addOutsideUpdate() {
+    return batchStreamManager.addOutsideUpdate();
+  }
+
+  @Override
+  public AsyncResponseWrapper<ODataBatchResponse> execute() {
+    return new AsyncResponseWrapperImpl(batchStreamManager.getResponse());
+  }
+
+  public class AsyncResponseWrapperImpl
+          extends AsyncRequestWrapperImpl<ODataBatchResponse>.AsyncResponseWrapperImpl {
+
+    /**
+     * Constructor.
+     *
+     * @param res OData batch response.
+     */
+    public AsyncResponseWrapperImpl(final ODataBatchResponse res) {
+      super();
+
+      if (res.getStatusCode() == 202) {
+        retrieveMonitorDetails(res);
+      } else {
+        response = res;
+      }
+    }
+
+    private void retrieveMonitorDetails(final ODataBatchResponse res) {
+      Collection<String> headers = res.getHeader(HeaderName.location.toString());
+      if (headers == null || headers.isEmpty()) {
+        throw new AsyncRequestException("Invalid async request response. Monitor URL not found");
+      } else {
+        this.location = URI.create(headers.iterator().next());
+      }
+
+      headers = res.getHeader(HeaderName.retryAfter.toString());
+      if (headers != null && !headers.isEmpty()) {
+        this.retryAfter = Integer.parseInt(headers.iterator().next());
+      }
+
+      headers = res.getHeader(HeaderName.preferenceApplied.toString());
+      if (headers != null && !headers.isEmpty()) {
+        for (String header : headers) {
+          if (header.equalsIgnoreCase(new ODataPreferences(ODataServiceVersion.V40).respondAsync())) {
+            preferenceApplied = true;
+          }
+        }
+      }
+
+      IOUtils.closeQuietly(res.getRawResponse());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
index d6905dc..d5331bc 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
@@ -19,6 +19,8 @@
 package org.apache.olingo.client.core.communication.request.v4;
 
 import org.apache.olingo.client.api.communication.request.ODataRequest;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
+import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
 import org.apache.olingo.client.api.communication.request.v4.AsyncRequestFactory;
 import org.apache.olingo.client.api.communication.request.v4.AsyncRequestWrapper;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
@@ -39,4 +41,9 @@ public class AsyncRequestFactoryImpl implements AsyncRequestFactory {
   public <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest) {
     return new AsyncRequestWrapperImpl<R>(client, odataRequest);
   }
+
+  @Override
+  public AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest) {
+    return new AsyncBatchRequestWrapperImpl(client, odataRequest);
+  }
 }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
index 57305cd..7aef8a9 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
@@ -41,7 +41,6 @@ import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapp
 import org.apache.olingo.client.api.http.HttpClientException;
 import org.apache.olingo.client.api.http.HttpMethod;
 import org.apache.olingo.client.api.v4.ODataClient;
-import org.apache.olingo.client.core.communication.header.ODataHeadersImpl;
 import org.apache.olingo.client.core.communication.request.AbstractODataRequest;
 import org.apache.olingo.client.core.communication.request.AbstractRequest;
 import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
@@ -49,34 +48,29 @@ import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
 public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRequest
         implements AsyncRequestWrapper<R> {
 
-  private static final int MAX_RETRY = 5;
+  protected static final int MAX_RETRY = 5;
 
-  private final ODataClient odataClient;
+  protected final ODataClient odataClient;
 
   /**
    * Request to be wrapped.
    */
-  private final ODataRequest odataRequest;
+  protected final ODataRequest odataRequest;
 
   /**
    * HTTP client.
    */
-  private final HttpClient httpClient;
+  protected final HttpClient httpClient;
 
   /**
    * HTTP request.
    */
-  private final HttpUriRequest request;
-
-  /**
-   * OData request header.
-   */
-  private final ODataHeadersImpl odataHeaders;
+  protected final HttpUriRequest request;
 
   /**
    * Target URI.
    */
-  private final URI uri;
+  protected final URI uri;
 
   protected AsyncRequestWrapperImpl(final ODataClient odataClient, final ODataRequest odataRequest) {
     this.odataRequest = odataRequest;
@@ -88,9 +82,6 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
     this.odataClient = odataClient;
     final HttpMethod method = odataRequest.getMethod();
 
-    // initialize default headers
-    this.odataHeaders = (ODataHeadersImpl) odataClient.getVersionHeaders();
-
     // target uri
     this.uri = odataRequest.getURI();
 
@@ -104,19 +95,19 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
   }
 
   @Override
-  public AsyncRequestWrapper<R> wait(final int waitInSeconds) {
+  public final AsyncRequestWrapper<R> wait(final int waitInSeconds) {
     extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).wait(waitInSeconds));
     return this;
   }
 
   @Override
-  public AsyncRequestWrapper<R> callback(URI url) {
+  public final AsyncRequestWrapper<R> callback(URI url) {
     extendHeader(HeaderName.prefer.toString(),
             new ODataPreferences(ODataServiceVersion.V40).callback(url.toASCIIString()));
     return this;
   }
 
-  private void extendHeader(final String headerName, final String headerValue) {
+  protected final void extendHeader(final String headerName, final String headerValue) {
     final StringBuilder extended = new StringBuilder();
     if (this.odataRequest.getHeaderNames().contains(headerName)) {
       extended.append(this.odataRequest.getHeader(headerName)).append(", ");
@@ -130,7 +121,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
     return new AsyncResponseWrapperImpl(doExecute());
   }
 
-  private HttpResponse doExecute() {
+  protected HttpResponse doExecute() {
     // Add all available headers
     for (String key : odataRequest.getHeaderNames()) {
       final String value = odataRequest.getHeader(key);
@@ -143,13 +134,16 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
 
   public class AsyncResponseWrapperImpl implements AsyncResponseWrapper<R> {
 
-    private URI location = null;
+    protected URI location = null;
 
-    private R response = null;
+    protected R response = null;
 
-    private int retryAfter = 5;
+    protected int retryAfter = 5;
 
-    private boolean preferenceApplied = false;
+    protected boolean preferenceApplied = false;
+
+    public AsyncResponseWrapperImpl() {
+    }
 
     /**
      * Constructor.
@@ -159,7 +153,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
     @SuppressWarnings("unchecked")
     public AsyncResponseWrapperImpl(final HttpResponse res) {
       if (res.getStatusLine().getStatusCode() == 202) {
-        retrieveMonitorDetails(res, true);
+        retrieveMonitorDetails(res);
       } else {
         response = (R) ((AbstractODataRequest<?>) odataRequest).getResponseTemplate().initFromHttpResponse(res);
       }
@@ -177,7 +171,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
         final HttpResponse res = checkMonitor(location);
 
         if (res.getStatusLine().getStatusCode() == 202) {
-          retrieveMonitorDetails(res, false);
+          retrieveMonitorDetails(res);
         } else {
           response = instantiateResponse(res);
         }
@@ -219,18 +213,34 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
       return response;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public ODataDeleteResponse delete() {
       final ODataDeleteRequest deleteRequest = odataClient.getCUDRequestFactory().getDeleteRequest(location);
       return deleteRequest.execute();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public AsyncResponseWrapper<ODataDeleteResponse> asyncDelete() {
       return odataClient.getAsyncRequestFactory().<ODataDeleteResponse>getAsyncRequestWrapper(
               odataClient.getCUDRequestFactory().getDeleteRequest(location)).execute();
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public AsyncResponseWrapper<R> forceNextMonitorCheck(final URI uri) {
+      this.location = uri;
+      this.response = null;
+      return this;
+    }
+
     @SuppressWarnings("unchecked")
     private R instantiateResponse(final HttpResponse res) {
       R odataResponse;
@@ -246,7 +256,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
       return odataResponse;
     }
 
-    private void retrieveMonitorDetails(final HttpResponse res, final boolean includePreferenceApplied) {
+    private void retrieveMonitorDetails(final HttpResponse res) {
       Header[] headers = res.getHeaders(HeaderName.location.toString());
       if (ArrayUtils.isNotEmpty(headers)) {
         this.location = URI.create(headers[0].getValue());
@@ -276,7 +286,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
     }
   }
 
-  private HttpResponse checkMonitor(final URI location) {
+  protected final HttpResponse checkMonitor(final URI location) {
     if (location == null) {
       throw new AsyncRequestException("Invalid async request response. Missing monitor URL");
     }
@@ -287,10 +297,8 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
     return executeHttpRequest(httpClient, monitor);
   }
 
-  private HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) {
-    checkRequest(odataClient, request);
-
-    HttpResponse response;
+  protected final HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) {
+    final HttpResponse response;
     try {
       response = client.execute(req);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
index d66fa20..fe8fa52 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -35,6 +34,7 @@ import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
 import org.apache.olingo.client.api.communication.header.HeaderName;
+import org.apache.olingo.client.api.communication.request.ODataStreamer;
 import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
 import org.apache.olingo.client.api.http.NoContentException;
@@ -257,10 +257,10 @@ public abstract class AbstractODataResponse implements ODataResponse {
       this.headers.putAll(partHeaders);
 
       final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      LOG.debug("Retrieved payload {}", bos.toString(Charset.forName("UTF-8").toString()));
 
       while (batchLineIterator.hasNext()) {
-        bos.write(batchLineIterator.nextLine().getBytes());
+        bos.write(batchLineIterator.nextLine().getBytes(Constants.UTF8));
+        bos.write(ODataStreamer.CRLF);
       }
 
       try {

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
index cf7da13..a800037 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
@@ -18,9 +18,13 @@
  */
 package org.apache.olingo.client.core.communication.response.v4;
 
+import java.util.Collection;
+import java.util.Map;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
+import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
 import org.apache.olingo.client.api.communication.response.v4.AsyncResponse;
+import org.apache.olingo.client.core.communication.request.batch.ODataBatchController;
 import org.apache.olingo.client.core.communication.response.AbstractODataResponse;
 
 /**
@@ -46,4 +50,27 @@ public class AsyncResponseImpl extends AbstractODataResponse implements AsyncRes
   public AsyncResponseImpl(final HttpClient client, final HttpResponse res) {
     super(client, res);
   }
+
+  /**
+   * Constructor to be used inside a batch item.
+   */
+  public AsyncResponseImpl(
+          final Map.Entry<Integer, String> responseLine,
+          final Map<String, Collection<String>> headers,
+          final ODataBatchLineIterator batchLineIterator,
+          final String boundary) {
+    super();
+
+    if (hasBeenInitialized) {
+      throw new IllegalStateException("Request already initialized");
+    }
+
+    this.hasBeenInitialized = true;
+
+    this.batchInfo = new ODataBatchController(batchLineIterator, boundary);
+
+    this.statusCode = responseLine.getKey();
+    this.statusMessage = responseLine.getValue();
+    this.headers.putAll(headers);
+  }
 }

http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
index 8a798dd..6c24789 100644
--- a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
+++ b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
@@ -26,12 +26,14 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.http.HttpResponse;
 import org.apache.olingo.client.api.ODataBatchConstants;
+import org.apache.olingo.client.api.communication.header.HeaderName;
 import org.apache.olingo.client.api.communication.request.ODataStreamManager;
 import org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem;
 import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
@@ -44,10 +46,13 @@ import org.apache.olingo.client.api.communication.request.cud.ODataEntityUpdateR
 import org.apache.olingo.client.api.communication.request.cud.v4.UpdateType;
 import org.apache.olingo.client.api.communication.request.retrieve.ODataEntityRequest;
 import org.apache.olingo.client.api.communication.request.retrieve.ODataEntitySetRequest;
+import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
 import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
 import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse;
 import org.apache.olingo.client.api.communication.response.ODataEntityUpdateResponse;
 import org.apache.olingo.client.api.communication.response.ODataResponse;
+import org.apache.olingo.client.api.communication.response.v4.AsyncResponse;
+import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper;
 import org.apache.olingo.client.api.uri.v4.URIBuilder;
 import org.apache.olingo.client.core.communication.request.AbstractODataStreamManager;
 import org.apache.olingo.client.core.communication.request.Wrapper;
@@ -397,7 +402,7 @@ public class BatchTestITCase extends AbstractTestITCase {
   }
 
   @Test
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({"unchecked"})
   public void batchRequest() throws EdmPrimitiveTypeException {
     // create your request
     final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(testStaticServiceRootURL);
@@ -412,8 +417,7 @@ public class BatchTestITCase extends AbstractTestITCase {
 
     // prepare URI
     URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL);
-    targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);//.
-//            expand("Orders").select("PersonID,Orders/OrderID");
+    targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);
 
     // create new request
     ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
@@ -520,9 +524,79 @@ public class BatchTestITCase extends AbstractTestITCase {
 
     entres = (ODataEntityRequestImpl.ODataEntityResponseImpl) res;
     entity = entres.getBody();
-    assertEquals("new last name",
-            entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class));
+    assertEquals("new last name", entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class));
+
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void async() {
+    // create your request
+    final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(
+            URI.create(testStaticServiceRootURL + "/async/").normalize().toASCIIString());
+    request.setAccept(ACCEPT);
+
+    final AsyncBatchRequestWrapper async = client.getAsyncRequestFactory().getAsyncBatchRequestWrapper(request);
+
+    // -------------------------------------------
+    // Add retrieve item
+    // -------------------------------------------
+    ODataRetrieve retrieve = async.addRetrieve();
 
+    // prepare URI
+    URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL);
+    targetURI.appendEntitySetSegment("People").appendKeySegment(5);
+
+    // create new request
+    ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
+    queryReq.setFormat(ODataPubFormat.JSON);
+
+    retrieve.setRequest(queryReq);
+    // -------------------------------------------
+
+    // -------------------------------------------
+    // Add retrieve item
+    // -------------------------------------------
+    retrieve = async.addRetrieve();
+
+    // prepare URI
+    targetURI = client.getURIBuilder(testStaticServiceRootURL).appendEntitySetSegment("Customers").appendKeySegment(1);
+
+    // create new request
+    queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
+
+    retrieve.setRequest(queryReq);
+    // -------------------------------------------
+
+    final AsyncResponseWrapper<ODataBatchResponse> responseWrapper = async.execute();
+
+    assertTrue(responseWrapper.isPreferenceApplied());
+    assertTrue(responseWrapper.isDone());
+
+    final ODataBatchResponse response = responseWrapper.getODataResponse();
+
+    assertEquals(200, response.getStatusCode());
+    assertEquals("Ok", response.getStatusMessage());
+    final Iterator<ODataBatchResponseItem> iter = response.getBody();
+
+    // retrieve the first item (ODataRetrieve)
+    ODataBatchResponseItem item = iter.next();
+    assertTrue(item instanceof ODataRetrieveResponseItem);
+
+    // The service return interim results to an asynchronously executing batch.
+    ODataRetrieveResponseItem retitem = (ODataRetrieveResponseItem) item;
+    ODataResponse res = retitem.next();
+    assertTrue(res instanceof AsyncResponse);
+    assertEquals(202, res.getStatusCode());
+    assertEquals("Accepted", res.getStatusMessage());
+
+    Collection<String> newMonitorLocation = res.getHeader(HeaderName.location);
+    if (newMonitorLocation != null && !newMonitorLocation.isEmpty()) {
+      responseWrapper.forceNextMonitorCheck(URI.create(newMonitorLocation.iterator().next()));
+      // .... now you can start again with isDone() and getODataResponse().
+    }
+
+    assertFalse(retitem.hasNext());
     assertFalse(iter.hasNext());
   }