You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@olingo.apache.org by fm...@apache.org on 2014/04/28 13:10:57 UTC
git commit: [OLINGO-246, OLINGO-248] provided async batch support
Repository: olingo-odata4
Updated Branches:
refs/heads/master 15e7718a0 -> 0b05798cd
[OLINGO-246, OLINGO-248] provided async batch support
Project: http://git-wip-us.apache.org/repos/asf/olingo-odata4/repo
Commit: http://git-wip-us.apache.org/repos/asf/olingo-odata4/commit/0b05798c
Tree: http://git-wip-us.apache.org/repos/asf/olingo-odata4/tree/0b05798c
Diff: http://git-wip-us.apache.org/repos/asf/olingo-odata4/diff/0b05798c
Branch: refs/heads/master
Commit: 0b05798cd943529bf0099cb05760f2baf1c6cf9b
Parents: 15e7718
Author: fmartelli <fa...@gmail.com>
Authored: Mon Apr 28 13:10:40 2014 +0200
Committer: fmartelli <fa...@gmail.com>
Committed: Mon Apr 28 13:10:40 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/olingo/fit/V4Services.java | 71 +++++++++--
.../request/v4/AsyncBatchRequestWrapper.java | 48 ++++++++
.../request/v4/AsyncRequestFactory.java | 3 +
.../response/v4/AsyncResponseWrapper.java | 11 ++
.../batch/ODataChangesetResponseItem.java | 13 +-
.../batch/ODataRetrieveResponseItem.java | 7 +-
.../request/batch/v4/ODataBatchRequestImpl.java | 3 +
.../v4/AsyncBatchRequestWrapperImpl.java | 119 +++++++++++++++++++
.../request/v4/AsyncRequestFactoryImpl.java | 7 ++
.../request/v4/AsyncRequestWrapperImpl.java | 70 ++++++-----
.../response/AbstractODataResponse.java | 6 +-
.../response/v4/AsyncResponseImpl.java | 27 +++++
.../client/core/it/v4/BatchTestITCase.java | 84 ++++++++++++-
13 files changed, 415 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/fit/src/main/java/org/apache/olingo/fit/V4Services.java
----------------------------------------------------------------------
diff --git a/fit/src/main/java/org/apache/olingo/fit/V4Services.java b/fit/src/main/java/org/apache/olingo/fit/V4Services.java
index 25eac89..38d8696 100644
--- a/fit/src/main/java/org/apache/olingo/fit/V4Services.java
+++ b/fit/src/main/java/org/apache/olingo/fit/V4Services.java
@@ -51,12 +51,15 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.cxf.interceptor.InInterceptors;
import org.apache.cxf.jaxrs.ext.multipart.Attachment;
+import org.apache.cxf.jaxrs.ext.multipart.Multipart;
+import org.apache.cxf.jaxrs.ext.multipart.MultipartBody;
import org.apache.olingo.commons.api.data.CollectionValue;
import org.apache.olingo.commons.api.data.ResWrap;
import org.apache.olingo.commons.api.data.Entry;
import org.apache.olingo.commons.api.data.Feed;
import org.apache.olingo.commons.api.data.Property;
import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
+import org.apache.olingo.commons.api.format.ContentType;
import org.apache.olingo.commons.core.data.AtomEntryImpl;
import org.apache.olingo.commons.core.data.AtomFeedImpl;
import org.apache.olingo.commons.core.data.AtomPropertyImpl;
@@ -152,6 +155,54 @@ public class V4Services extends AbstractServices {
}
}
+ @POST
+ @Path("/async/$batch")
+ public Response async(
+ @Context final UriInfo uriInfo,
+ @HeaderParam("Prefer") @DefaultValue(StringUtils.EMPTY) final String prefer,
+ final @Multipart MultipartBody attachment) {
+
+ try {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ bos.write("HTTP/1.1 200 Ok".getBytes());
+ bos.write(CRLF);
+ bos.write("OData-Version: 4.0".getBytes());
+ bos.write(CRLF);
+ bos.write(("Content-Type: " + ContentType.APPLICATION_OCTET_STREAM + ";boundary=" + BOUNDARY).getBytes());
+ bos.write(CRLF);
+ bos.write(CRLF);
+
+ bos.write(("--" + BOUNDARY).getBytes());
+ bos.write(CRLF);
+ bos.write("Content-Type: application/http".getBytes());
+ bos.write(CRLF);
+ bos.write("Content-Transfer-Encoding: binary".getBytes());
+ bos.write(CRLF);
+ bos.write(CRLF);
+
+ bos.write("HTTP/1.1 202 Accepted".getBytes());
+ bos.write(CRLF);
+ bos.write("Location: http://service-root/async-monitor".getBytes());
+ bos.write(CRLF);
+ bos.write("Retry-After: 10".getBytes());
+ bos.write(CRLF);
+ bos.write(CRLF);
+ bos.write(("--" + BOUNDARY + "--").getBytes());
+ bos.write(CRLF);
+
+ final UUID uuid = UUID.randomUUID();
+ providedAsync.put(uuid.toString(), bos.toString(Constants.ENCODING.toString()));
+
+ bos.flush();
+ bos.close();
+
+ return xml.createAsyncResponse(
+ uriInfo.getRequestUri().toASCIIString().replace("async/$batch", "") + "monitor/" + uuid.toString());
+ } catch (Exception e) {
+ return xml.createFaultResponse(Accept.JSON.toString(), e);
+ }
+ }
+
@GET
@Path("/async/{name}")
public Response async(
@@ -568,7 +619,7 @@ public class V4Services extends AbstractServices {
return utils.getValue().createResponse(
FSManager.instance(version).readFile(Constants.get(version, ConstantKey.REF)
- + File.separatorChar + filename, utils.getKey()),
+ + File.separatorChar + filename, utils.getKey()),
null,
utils.getKey());
} catch (Exception e) {
@@ -590,7 +641,7 @@ public class V4Services extends AbstractServices {
final Response response =
getEntityInternal(uriInfo.getRequestUri().toASCIIString(),
- accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false);
+ accept, entitySetName, entityId, accept, StringUtils.EMPTY, StringUtils.EMPTY, false);
return response.getStatus() >= 400
? postNewEntity(uriInfo, accept, contentType, prefer, entitySetName, changes)
: super.patchEntity(uriInfo, accept, contentType, prefer, ifMatch, entitySetName, entityId, changes);
@@ -688,8 +739,8 @@ public class V4Services extends AbstractServices {
} else {
final ResWrap<JSONEntryImpl> jcontainer =
mapper.readValue(IOUtils.toInputStream(entity, Constants.ENCODING),
- new TypeReference<JSONEntryImpl>() {
- });
+ new TypeReference<JSONEntryImpl>() {
+ });
entry = dataBinder.toAtomEntry(jcontainer.getPayload());
@@ -787,7 +838,7 @@ public class V4Services extends AbstractServices {
final ResWrap<JSONEntryImpl> jsonContainer = mapper.readValue(
IOUtils.toInputStream(changes, Constants.ENCODING), new TypeReference<JSONEntryImpl>() {
- });
+ });
jsonContainer.getPayload().setType(typeInfo.getFullQualifiedName().toString());
entryChanges = dataBinder.toAtomEntry(jsonContainer.getPayload());
}
@@ -820,7 +871,7 @@ public class V4Services extends AbstractServices {
// 1. Fetch the contained entity to be removed
final InputStream entry = FSManager.instance(version).
readFile(containedPath(entityId, containedEntitySetName).
- append('(').append(containedEntityId).append(')').toString(), Accept.ATOM);
+ append('(').append(containedEntityId).append(')').toString(), Accept.ATOM);
final ResWrap<AtomEntryImpl> container = atomDeserializer.read(entry, AtomEntryImpl.class);
// 2. Remove the contained entity
@@ -1049,8 +1100,8 @@ public class V4Services extends AbstractServices {
} else {
final ResWrap<JSONPropertyImpl> paramContainer =
mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING),
- new TypeReference<JSONPropertyImpl>() {
- });
+ new TypeReference<JSONPropertyImpl>() {
+ });
property = paramContainer.getPayload();
}
@@ -1091,8 +1142,8 @@ public class V4Services extends AbstractServices {
} else {
final ResWrap<JSONPropertyImpl> paramContainer =
mapper.readValue(IOUtils.toInputStream(param, Constants.ENCODING),
- new TypeReference<JSONPropertyImpl>() {
- });
+ new TypeReference<JSONPropertyImpl>() {
+ });
property = paramContainer.getPayload();
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java
----------------------------------------------------------------------
diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java
new file mode 100644
index 0000000..9efa0fd
--- /dev/null
+++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncBatchRequestWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.api.communication.request.v4;
+
+import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
+import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate;
+import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
+
+public interface AsyncBatchRequestWrapper extends AsyncRequestWrapper<ODataBatchResponse> {
+
+ /**
+ * Gets a changeset batch item instance. A changeset can be submitted embedded into a batch request only.
+ *
+ * @return ODataChangeset instance.
+ */
+ ODataChangeset addChangeset();
+
+ /**
+ * Gets a retrieve batch item instance. A retrieve item can be submitted embedded into a batch request only.
+ *
+ * @return ODataRetrieve instance.
+ */
+ ODataRetrieve addRetrieve();
+
+ /**
+ * Gets an outside change batch item instance. An outside item can be submitted embedded into a batch request only.
+ *
+ * @return ODataOutsideUpdate instance.
+ */
+ ODataOutsideUpdate addOutsideUpdate();
+}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
----------------------------------------------------------------------
diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
index 9fd06dc..17b510f 100644
--- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
+++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/request/v4/AsyncRequestFactory.java
@@ -19,10 +19,13 @@
package org.apache.olingo.client.api.communication.request.v4;
import org.apache.olingo.client.api.communication.request.ODataRequest;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
import org.apache.olingo.client.api.communication.response.ODataResponse;
@SuppressWarnings("unchecked")
public interface AsyncRequestFactory {
<R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest);
+
+ AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest);
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
----------------------------------------------------------------------
diff --git a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
index 87b6904..04d3688 100644
--- a/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
+++ b/lib/client-api/src/main/java/org/apache/olingo/client/api/communication/response/v4/AsyncResponseWrapper.java
@@ -18,6 +18,7 @@
*/
package org.apache.olingo.client.api.communication.response.v4;
+import java.net.URI;
import org.apache.olingo.client.api.communication.response.ODataDeleteResponse;
import org.apache.olingo.client.api.communication.response.ODataResponse;
@@ -50,6 +51,16 @@ public interface AsyncResponseWrapper<R extends ODataResponse> {
R getODataResponse();
/**
+ * Specifies the location for the next monitor check.
+ * <br />
+ * Overrides the location value retrieved among headers and nullifies the previous valid response (if exists).
+ *
+ * @param uri monitor location.
+ * @return the current async response wrapper.
+ */
+ AsyncResponseWrapper<R> forceNextMonitorCheck(URI uri);
+
+ /**
* DeleteA DELETE request sent to the status monitor resource requests that the asynchronous processing be canceled. A
* 200 OK or to a 204 No Content response indicates that the asynchronous processing has been successfully canceled.
*
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
index 53eeb16..51f3a48 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataChangesetResponseItem.java
@@ -25,6 +25,7 @@ import org.apache.olingo.client.api.ODataBatchConstants;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG;
import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse;
+import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl;
/**
* Changeset wrapper for the corresponding batch item.
@@ -116,11 +117,15 @@ public class ODataChangesetResponseItem extends AbstractODataBatchResponseItem {
final Map.Entry<Integer, String> responseLine = ODataBatchUtilities.readResponseLine(batchLineIterator);
LOG.debug("Retrieved item response {}", responseLine);
- if (responseLine.getKey() >= 400) {
- // generate error response
- final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
- LOG.debug("Retrieved item headers {}", headers);
+ final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
+ LOG.debug("Retrieved item headers {}", headers);
+ if (responseLine.getKey() == 202) {
+ // generate async response
+ current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary);
+ return current;
+ } else if (responseLine.getKey() >= 400) {
+ // generate error response
current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary);
return current;
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
index 8919ffe..a180b37 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/ODataRetrieveResponseItem.java
@@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import static org.apache.olingo.client.core.communication.request.batch.AbstractODataBatchResponseItem.LOG;
import org.apache.olingo.client.core.communication.response.batch.ODataBatchErrorResponse;
+import org.apache.olingo.client.core.communication.response.v4.AsyncResponseImpl;
/**
* Retrieve response wrapper for the corresponding batch item.
@@ -58,7 +59,11 @@ public class ODataRetrieveResponseItem extends AbstractODataBatchResponseItem {
final Map<String, Collection<String>> headers = ODataBatchUtilities.readHeaders(batchLineIterator);
LOG.debug("Retrieved item headers {}", headers);
- if (responseLine.getKey() >= 400) {
+ if (responseLine.getKey() == 202) {
+ // generate async response
+ current = new AsyncResponseImpl(responseLine, headers, batchLineIterator, boundary);
+ breakingitem = true;
+ } else if (responseLine.getKey() >= 400) {
// generate error response
current = new ODataBatchErrorResponse(responseLine, headers, batchLineIterator, boundary);
breakingitem = true;
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
index 648cb35..a445013 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/batch/v4/ODataBatchRequestImpl.java
@@ -122,6 +122,9 @@ public class ODataBatchRequestImpl
*/
protected class ODataBatchResponseImpl extends AbstractODataResponse implements ODataBatchResponse {
+ private ODataBatchResponseImpl() {
+ }
+
/**
* Constructor.
*
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java
new file mode 100644
index 0000000..8a3d903
--- /dev/null
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncBatchRequestWrapperImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.olingo.client.core.communication.request.v4;
+
+import java.net.URI;
+import java.util.Collection;
+import org.apache.commons.io.IOUtils;
+import org.apache.olingo.client.api.communication.header.HeaderName;
+import org.apache.olingo.client.api.communication.header.ODataPreferences;
+import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
+import org.apache.olingo.client.api.communication.request.batch.ODataRetrieve;
+import org.apache.olingo.client.api.communication.request.batch.v4.BatchStreamManager;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataOutsideUpdate;
+import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
+import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
+import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper;
+import org.apache.olingo.client.api.v4.ODataClient;
+import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
+
+public class AsyncBatchRequestWrapperImpl extends AsyncRequestWrapperImpl<ODataBatchResponse>
+ implements AsyncBatchRequestWrapper {
+
+ private BatchStreamManager batchStreamManager;
+
+ protected AsyncBatchRequestWrapperImpl(final ODataClient odataClient, final ODataBatchRequest odataRequest) {
+ super(odataClient, odataRequest);
+ batchStreamManager = odataRequest.execute();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ODataChangeset addChangeset() {
+ return batchStreamManager.addChangeset();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ODataRetrieve addRetrieve() {
+ return batchStreamManager.addRetrieve();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ODataOutsideUpdate addOutsideUpdate() {
+ return batchStreamManager.addOutsideUpdate();
+ }
+
+ @Override
+ public AsyncResponseWrapper<ODataBatchResponse> execute() {
+ return new AsyncResponseWrapperImpl(batchStreamManager.getResponse());
+ }
+
+ public class AsyncResponseWrapperImpl
+ extends AsyncRequestWrapperImpl<ODataBatchResponse>.AsyncResponseWrapperImpl {
+
+ /**
+ * Constructor.
+ *
+ * @param res OData batch response.
+ */
+ public AsyncResponseWrapperImpl(final ODataBatchResponse res) {
+ super();
+
+ if (res.getStatusCode() == 202) {
+ retrieveMonitorDetails(res);
+ } else {
+ response = res;
+ }
+ }
+
+ private void retrieveMonitorDetails(final ODataBatchResponse res) {
+ Collection<String> headers = res.getHeader(HeaderName.location.toString());
+ if (headers == null || headers.isEmpty()) {
+ throw new AsyncRequestException("Invalid async request response. Monitor URL not found");
+ } else {
+ this.location = URI.create(headers.iterator().next());
+ }
+
+ headers = res.getHeader(HeaderName.retryAfter.toString());
+ if (headers != null && !headers.isEmpty()) {
+ this.retryAfter = Integer.parseInt(headers.iterator().next());
+ }
+
+ headers = res.getHeader(HeaderName.preferenceApplied.toString());
+ if (headers != null && !headers.isEmpty()) {
+ for (String header : headers) {
+ if (header.equalsIgnoreCase(new ODataPreferences(ODataServiceVersion.V40).respondAsync())) {
+ preferenceApplied = true;
+ }
+ }
+ }
+
+ IOUtils.closeQuietly(res.getRawResponse());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
index d6905dc..d5331bc 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestFactoryImpl.java
@@ -19,6 +19,8 @@
package org.apache.olingo.client.core.communication.request.v4;
import org.apache.olingo.client.api.communication.request.ODataRequest;
+import org.apache.olingo.client.api.communication.request.batch.v4.ODataBatchRequest;
+import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
import org.apache.olingo.client.api.communication.request.v4.AsyncRequestFactory;
import org.apache.olingo.client.api.communication.request.v4.AsyncRequestWrapper;
import org.apache.olingo.client.api.communication.response.ODataResponse;
@@ -39,4 +41,9 @@ public class AsyncRequestFactoryImpl implements AsyncRequestFactory {
public <R extends ODataResponse> AsyncRequestWrapper<R> getAsyncRequestWrapper(final ODataRequest odataRequest) {
return new AsyncRequestWrapperImpl<R>(client, odataRequest);
}
+
+ @Override
+ public AsyncBatchRequestWrapper getAsyncBatchRequestWrapper(final ODataBatchRequest odataRequest) {
+ return new AsyncBatchRequestWrapperImpl(client, odataRequest);
+ }
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
index 57305cd..7aef8a9 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/request/v4/AsyncRequestWrapperImpl.java
@@ -41,7 +41,6 @@ import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapp
import org.apache.olingo.client.api.http.HttpClientException;
import org.apache.olingo.client.api.http.HttpMethod;
import org.apache.olingo.client.api.v4.ODataClient;
-import org.apache.olingo.client.core.communication.header.ODataHeadersImpl;
import org.apache.olingo.client.core.communication.request.AbstractODataRequest;
import org.apache.olingo.client.core.communication.request.AbstractRequest;
import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
@@ -49,34 +48,29 @@ import org.apache.olingo.commons.api.edm.constants.ODataServiceVersion;
public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRequest
implements AsyncRequestWrapper<R> {
- private static final int MAX_RETRY = 5;
+ protected static final int MAX_RETRY = 5;
- private final ODataClient odataClient;
+ protected final ODataClient odataClient;
/**
* Request to be wrapped.
*/
- private final ODataRequest odataRequest;
+ protected final ODataRequest odataRequest;
/**
* HTTP client.
*/
- private final HttpClient httpClient;
+ protected final HttpClient httpClient;
/**
* HTTP request.
*/
- private final HttpUriRequest request;
-
- /**
- * OData request header.
- */
- private final ODataHeadersImpl odataHeaders;
+ protected final HttpUriRequest request;
/**
* Target URI.
*/
- private final URI uri;
+ protected final URI uri;
protected AsyncRequestWrapperImpl(final ODataClient odataClient, final ODataRequest odataRequest) {
this.odataRequest = odataRequest;
@@ -88,9 +82,6 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
this.odataClient = odataClient;
final HttpMethod method = odataRequest.getMethod();
- // initialize default headers
- this.odataHeaders = (ODataHeadersImpl) odataClient.getVersionHeaders();
-
// target uri
this.uri = odataRequest.getURI();
@@ -104,19 +95,19 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
}
@Override
- public AsyncRequestWrapper<R> wait(final int waitInSeconds) {
+ public final AsyncRequestWrapper<R> wait(final int waitInSeconds) {
extendHeader(HeaderName.prefer.toString(), new ODataPreferences(ODataServiceVersion.V40).wait(waitInSeconds));
return this;
}
@Override
- public AsyncRequestWrapper<R> callback(URI url) {
+ public final AsyncRequestWrapper<R> callback(URI url) {
extendHeader(HeaderName.prefer.toString(),
new ODataPreferences(ODataServiceVersion.V40).callback(url.toASCIIString()));
return this;
}
- private void extendHeader(final String headerName, final String headerValue) {
+ protected final void extendHeader(final String headerName, final String headerValue) {
final StringBuilder extended = new StringBuilder();
if (this.odataRequest.getHeaderNames().contains(headerName)) {
extended.append(this.odataRequest.getHeader(headerName)).append(", ");
@@ -130,7 +121,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return new AsyncResponseWrapperImpl(doExecute());
}
- private HttpResponse doExecute() {
+ protected HttpResponse doExecute() {
// Add all available headers
for (String key : odataRequest.getHeaderNames()) {
final String value = odataRequest.getHeader(key);
@@ -143,13 +134,16 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
public class AsyncResponseWrapperImpl implements AsyncResponseWrapper<R> {
- private URI location = null;
+ protected URI location = null;
- private R response = null;
+ protected R response = null;
- private int retryAfter = 5;
+ protected int retryAfter = 5;
- private boolean preferenceApplied = false;
+ protected boolean preferenceApplied = false;
+
+ public AsyncResponseWrapperImpl() {
+ }
/**
* Constructor.
@@ -159,7 +153,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
@SuppressWarnings("unchecked")
public AsyncResponseWrapperImpl(final HttpResponse res) {
if (res.getStatusLine().getStatusCode() == 202) {
- retrieveMonitorDetails(res, true);
+ retrieveMonitorDetails(res);
} else {
response = (R) ((AbstractODataRequest<?>) odataRequest).getResponseTemplate().initFromHttpResponse(res);
}
@@ -177,7 +171,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
final HttpResponse res = checkMonitor(location);
if (res.getStatusLine().getStatusCode() == 202) {
- retrieveMonitorDetails(res, false);
+ retrieveMonitorDetails(res);
} else {
response = instantiateResponse(res);
}
@@ -219,18 +213,34 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return response;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public ODataDeleteResponse delete() {
final ODataDeleteRequest deleteRequest = odataClient.getCUDRequestFactory().getDeleteRequest(location);
return deleteRequest.execute();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public AsyncResponseWrapper<ODataDeleteResponse> asyncDelete() {
return odataClient.getAsyncRequestFactory().<ODataDeleteResponse>getAsyncRequestWrapper(
odataClient.getCUDRequestFactory().getDeleteRequest(location)).execute();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public AsyncResponseWrapper<R> forceNextMonitorCheck(final URI uri) {
+ this.location = uri;
+ this.response = null;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
private R instantiateResponse(final HttpResponse res) {
R odataResponse;
@@ -246,7 +256,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return odataResponse;
}
- private void retrieveMonitorDetails(final HttpResponse res, final boolean includePreferenceApplied) {
+ private void retrieveMonitorDetails(final HttpResponse res) {
Header[] headers = res.getHeaders(HeaderName.location.toString());
if (ArrayUtils.isNotEmpty(headers)) {
this.location = URI.create(headers[0].getValue());
@@ -276,7 +286,7 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
}
}
- private HttpResponse checkMonitor(final URI location) {
+ protected final HttpResponse checkMonitor(final URI location) {
if (location == null) {
throw new AsyncRequestException("Invalid async request response. Missing monitor URL");
}
@@ -287,10 +297,8 @@ public class AsyncRequestWrapperImpl<R extends ODataResponse> extends AbstractRe
return executeHttpRequest(httpClient, monitor);
}
- private HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) {
- checkRequest(odataClient, request);
-
- HttpResponse response;
+ protected final HttpResponse executeHttpRequest(final HttpClient client, final HttpUriRequest req) {
+ final HttpResponse response;
try {
response = client.execute(req);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
index d66fa20..fe8fa52 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/AbstractODataResponse.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-import java.nio.charset.Charset;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -35,6 +34,7 @@ import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.olingo.client.api.communication.header.HeaderName;
+import org.apache.olingo.client.api.communication.request.ODataStreamer;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
import org.apache.olingo.client.api.communication.response.ODataResponse;
import org.apache.olingo.client.api.http.NoContentException;
@@ -257,10 +257,10 @@ public abstract class AbstractODataResponse implements ODataResponse {
this.headers.putAll(partHeaders);
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- LOG.debug("Retrieved payload {}", bos.toString(Charset.forName("UTF-8").toString()));
while (batchLineIterator.hasNext()) {
- bos.write(batchLineIterator.nextLine().getBytes());
+ bos.write(batchLineIterator.nextLine().getBytes(Constants.UTF8));
+ bos.write(ODataStreamer.CRLF);
}
try {
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
index cf7da13..a800037 100644
--- a/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
+++ b/lib/client-core/src/main/java/org/apache/olingo/client/core/communication/response/v4/AsyncResponseImpl.java
@@ -18,9 +18,13 @@
*/
package org.apache.olingo.client.core.communication.response.v4;
+import java.util.Collection;
+import java.util.Map;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.olingo.client.api.communication.request.batch.ODataBatchLineIterator;
import org.apache.olingo.client.api.communication.response.v4.AsyncResponse;
+import org.apache.olingo.client.core.communication.request.batch.ODataBatchController;
import org.apache.olingo.client.core.communication.response.AbstractODataResponse;
/**
@@ -46,4 +50,27 @@ public class AsyncResponseImpl extends AbstractODataResponse implements AsyncRes
public AsyncResponseImpl(final HttpClient client, final HttpResponse res) {
super(client, res);
}
+
+ /**
+ * Constructor to be used inside a batch item.
+ */
+ public AsyncResponseImpl(
+ final Map.Entry<Integer, String> responseLine,
+ final Map<String, Collection<String>> headers,
+ final ODataBatchLineIterator batchLineIterator,
+ final String boundary) {
+ super();
+
+ if (hasBeenInitialized) {
+ throw new IllegalStateException("Request already initialized");
+ }
+
+ this.hasBeenInitialized = true;
+
+ this.batchInfo = new ODataBatchController(batchLineIterator, boundary);
+
+ this.statusCode = responseLine.getKey();
+ this.statusMessage = responseLine.getValue();
+ this.headers.putAll(headers);
+ }
}
http://git-wip-us.apache.org/repos/asf/olingo-odata4/blob/0b05798c/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
----------------------------------------------------------------------
diff --git a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
index 8a798dd..6c24789 100644
--- a/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
+++ b/lib/client-core/src/test/java/org/apache/olingo/client/core/it/v4/BatchTestITCase.java
@@ -26,12 +26,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.Calendar;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpResponse;
import org.apache.olingo.client.api.ODataBatchConstants;
+import org.apache.olingo.client.api.communication.header.HeaderName;
import org.apache.olingo.client.api.communication.request.ODataStreamManager;
import org.apache.olingo.client.api.communication.request.batch.ODataBatchResponseItem;
import org.apache.olingo.client.api.communication.request.batch.ODataChangeset;
@@ -44,10 +46,13 @@ import org.apache.olingo.client.api.communication.request.cud.ODataEntityUpdateR
import org.apache.olingo.client.api.communication.request.cud.v4.UpdateType;
import org.apache.olingo.client.api.communication.request.retrieve.ODataEntityRequest;
import org.apache.olingo.client.api.communication.request.retrieve.ODataEntitySetRequest;
+import org.apache.olingo.client.api.communication.request.v4.AsyncBatchRequestWrapper;
import org.apache.olingo.client.api.communication.response.ODataBatchResponse;
import org.apache.olingo.client.api.communication.response.ODataEntityCreateResponse;
import org.apache.olingo.client.api.communication.response.ODataEntityUpdateResponse;
import org.apache.olingo.client.api.communication.response.ODataResponse;
+import org.apache.olingo.client.api.communication.response.v4.AsyncResponse;
+import org.apache.olingo.client.api.communication.response.v4.AsyncResponseWrapper;
import org.apache.olingo.client.api.uri.v4.URIBuilder;
import org.apache.olingo.client.core.communication.request.AbstractODataStreamManager;
import org.apache.olingo.client.core.communication.request.Wrapper;
@@ -397,7 +402,7 @@ public class BatchTestITCase extends AbstractTestITCase {
}
@Test
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked"})
public void batchRequest() throws EdmPrimitiveTypeException {
// create your request
final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(testStaticServiceRootURL);
@@ -412,8 +417,7 @@ public class BatchTestITCase extends AbstractTestITCase {
// prepare URI
URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL);
- targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);//.
-// expand("Orders").select("PersonID,Orders/OrderID");
+ targetURI.appendEntitySetSegment("Customers").appendKeySegment(1);
// create new request
ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
@@ -520,9 +524,79 @@ public class BatchTestITCase extends AbstractTestITCase {
entres = (ODataEntityRequestImpl.ODataEntityResponseImpl) res;
entity = entres.getBody();
- assertEquals("new last name",
- entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class));
+ assertEquals("new last name", entity.getProperty("LastName").getPrimitiveValue().toCastValue(String.class));
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void async() {
+ // create your request
+ final ODataBatchRequest request = client.getBatchRequestFactory().getBatchRequest(
+ URI.create(testStaticServiceRootURL + "/async/").normalize().toASCIIString());
+ request.setAccept(ACCEPT);
+
+ final AsyncBatchRequestWrapper async = client.getAsyncRequestFactory().getAsyncBatchRequestWrapper(request);
+
+ // -------------------------------------------
+ // Add retrieve item
+ // -------------------------------------------
+ ODataRetrieve retrieve = async.addRetrieve();
+ // prepare URI
+ URIBuilder targetURI = client.getURIBuilder(testStaticServiceRootURL);
+ targetURI.appendEntitySetSegment("People").appendKeySegment(5);
+
+ // create new request
+ ODataEntityRequest<ODataEntity> queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
+ queryReq.setFormat(ODataPubFormat.JSON);
+
+ retrieve.setRequest(queryReq);
+ // -------------------------------------------
+
+ // -------------------------------------------
+ // Add retrieve item
+ // -------------------------------------------
+ retrieve = async.addRetrieve();
+
+ // prepare URI
+ targetURI = client.getURIBuilder(testStaticServiceRootURL).appendEntitySetSegment("Customers").appendKeySegment(1);
+
+ // create new request
+ queryReq = client.getRetrieveRequestFactory().getEntityRequest(targetURI.build());
+
+ retrieve.setRequest(queryReq);
+ // -------------------------------------------
+
+ final AsyncResponseWrapper<ODataBatchResponse> responseWrapper = async.execute();
+
+ assertTrue(responseWrapper.isPreferenceApplied());
+ assertTrue(responseWrapper.isDone());
+
+ final ODataBatchResponse response = responseWrapper.getODataResponse();
+
+ assertEquals(200, response.getStatusCode());
+ assertEquals("Ok", response.getStatusMessage());
+ final Iterator<ODataBatchResponseItem> iter = response.getBody();
+
+ // retrieve the first item (ODataRetrieve)
+ ODataBatchResponseItem item = iter.next();
+ assertTrue(item instanceof ODataRetrieveResponseItem);
+
+ // The service return interim results to an asynchronously executing batch.
+ ODataRetrieveResponseItem retitem = (ODataRetrieveResponseItem) item;
+ ODataResponse res = retitem.next();
+ assertTrue(res instanceof AsyncResponse);
+ assertEquals(202, res.getStatusCode());
+ assertEquals("Accepted", res.getStatusMessage());
+
+ Collection<String> newMonitorLocation = res.getHeader(HeaderName.location);
+ if (newMonitorLocation != null && !newMonitorLocation.isEmpty()) {
+ responseWrapper.forceNextMonitorCheck(URI.create(newMonitorLocation.iterator().next()));
+ // .... now you can start again with isDone() and getODataResponse().
+ }
+
+ assertFalse(retitem.hasNext());
assertFalse(iter.hasNext());
}