You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/07/24 18:10:44 UTC
[airavata-mft] branch develop updated: Handling errors at MFT http
download request
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/develop by this push:
new fbd9168 Handling errors at MFT http download request
fbd9168 is described below
commit fbd91687b941095ea62a9ccc7e55f23967b1fe86
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Jul 24 14:10:30 2021 -0400
Handling errors at MFT http download request
---
.../airavata/mft/agent/http/HttpServerHandler.java | 161 +++++++++++----------
1 file changed, 84 insertions(+), 77 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index 07c0191..f9c026d 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -51,111 +51,118 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
- if (!request.decoderResult().isSuccess()) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
- if (request.method() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
+ try {
+ if (!request.decoderResult().isSuccess()) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
- final String uri = request.uri().substring(1);
- logger.info("Received download request through url {}", uri);
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
- HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+ final String uri = request.uri().substring(1);
+ logger.info("Received download request through url {}", uri);
- if (httpTransferRequest == null) {
- logger.error("Couldn't find transfer request for uri {}", uri);
- sendError(ctx, NOT_FOUND);
- return;
- }
+ HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
- Connector connector = httpTransferRequest.getOtherConnector();
- MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
+ if (httpTransferRequest == null) {
+ logger.error("Couldn't find transfer request for uri {}", uri);
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
- ConnectorParams params = httpTransferRequest.getConnectorParams();
+ Connector connector = httpTransferRequest.getOtherConnector();
+ MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
- // TODO Load from HTTP Headers
- AuthToken authToken = httpTransferRequest.getAuthToken();
+ ConnectorParams params = httpTransferRequest.getConnectorParams();
- connector.init(params.getResourceServiceHost(),
- params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
+ // TODO Load from HTTP Headers
+ AuthToken authToken = httpTransferRequest.getAuthToken();
- metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
- params.getSecretServiceHost(), params.getSecretServicePort());
+ connector.init(params.getResourceServiceHost(),
+ params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
- Boolean available = metadataCollector.isAvailable(authToken,
- httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
+ metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
+ params.getSecretServiceHost(), params.getSecretServicePort());
+ Boolean available = metadataCollector.isAvailable(authToken,
+ httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
- if (!available) {
- logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
- sendError(ctx, NOT_FOUND);
- return;
- }
- FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
- httpTransferRequest.getResourceId(),
- httpTransferRequest.getChildResourcePath(),
- httpTransferRequest.getCredentialToken());
+ if (!available) {
+ logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
- long fileLength = fileResourceMetadata.getResourceSize();
+ FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
+ httpTransferRequest.getResourceId(),
+ httpTransferRequest.getChildResourcePath(),
+ httpTransferRequest.getCredentialToken());
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- HttpUtil.setContentLength(response, fileLength);
- setContentTypeHeader(response, httpTransferRequest.getResourceId());
+ long fileLength = fileResourceMetadata.getResourceSize();
- if (HttpUtil.isKeepAlive(request)) {
- response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
- }
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ HttpUtil.setContentLength(response, fileLength);
+ setContentTypeHeader(response, httpTransferRequest.getResourceId());
+
+ if (HttpUtil.isKeepAlive(request)) {
+ response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+ }
- // Write the initial line and the header.
- ctx.write(response);
+ // Write the initial line and the header.
+ ctx.write(response);
- // Write the content.
- ChannelFuture sendFileFuture;
- ChannelFuture lastContentFuture;
+ // Write the content.
+ ChannelFuture sendFileFuture;
+ ChannelFuture lastContentFuture;
- ConnectorContext connectorContext = new ConnectorContext();
- connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
- connectorContext.setTransferId(uri);
- connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
+ ConnectorContext connectorContext = new ConnectorContext();
+ connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
+ connectorContext.setTransferId(uri);
+ connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
- TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
- httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
- connectorContext, connector);
+ TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
+ httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
+ connectorContext, connector);
- // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
- Future<Integer> pullStatusFuture = executor.submit(pullTask);
+ // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
+ Future<Integer> pullStatusFuture = executor.submit(pullTask);
- sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
- ctx.newProgressivePromise());
+ sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
+ ctx.newProgressivePromise());
- // HttpChunkedInput will write the end marker (LastHttpContent) for us.
- lastContentFuture = sendFileFuture;
+ // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+ lastContentFuture = sendFileFuture;
- sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
- @Override
- public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
- if (total < 0) { // total unknown
- logger.error(future.channel() + " Transfer progress: " + progress);
- } else {
- logger.error(future.channel() + " Transfer progress: " + progress + " / " + total);
+ sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
+ @Override
+ public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
+ if (total < 0) { // total unknown
+ logger.error(future.channel() + " Transfer progress: " + progress);
+ } else {
+ logger.error(future.channel() + " Transfer progress: " + progress + " / " + total);
+ }
}
- }
- @Override
- public void operationComplete(ChannelProgressiveFuture future) {
- System.err.println(future.channel() + " Transfer complete.");
+ @Override
+ public void operationComplete(ChannelProgressiveFuture future) {
+ System.err.println(future.channel() + " Transfer complete.");
+ }
+ });
+
+ // Decide whether to close the connection or not.
+ if (!HttpUtil.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
- });
- // Decide whether to close the connection or not.
- if (!HttpUtil.isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ } catch (Exception e) {
+ logger.error("Errored while processing HTTP download request", e);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
}
}