You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/07/24 18:10:44 UTC

[airavata-mft] branch develop updated: Handling errors at MFT http download request

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/develop by this push:
     new fbd9168  Handling errors at MFT http download request
fbd9168 is described below

commit fbd91687b941095ea62a9ccc7e55f23967b1fe86
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Jul 24 14:10:30 2021 -0400

    Handling errors at MFT http download request
---
 .../airavata/mft/agent/http/HttpServerHandler.java | 161 +++++++++++----------
 1 file changed, 84 insertions(+), 77 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index 07c0191..f9c026d 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -51,111 +51,118 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
 
     @Override
     public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
-        if (!request.decoderResult().isSuccess()) {
-            sendError(ctx, BAD_REQUEST);
-            return;
-        }
 
-        if (request.method() != GET) {
-            sendError(ctx, METHOD_NOT_ALLOWED);
-            return;
-        }
+        try {
+            if (!request.decoderResult().isSuccess()) {
+                sendError(ctx, BAD_REQUEST);
+                return;
+            }
 
-        final String uri = request.uri().substring(1);
-        logger.info("Received download request through url {}", uri);
+            if (request.method() != GET) {
+                sendError(ctx, METHOD_NOT_ALLOWED);
+                return;
+            }
 
-        HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+            final String uri = request.uri().substring(1);
+            logger.info("Received download request through url {}", uri);
 
-        if (httpTransferRequest == null) {
-            logger.error("Couldn't find transfer request for uri {}", uri);
-            sendError(ctx, NOT_FOUND);
-            return;
-        }
+            HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
 
-        Connector connector = httpTransferRequest.getOtherConnector();
-        MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
+            if (httpTransferRequest == null) {
+                logger.error("Couldn't find transfer request for uri {}", uri);
+                sendError(ctx, NOT_FOUND);
+                return;
+            }
 
-        ConnectorParams params = httpTransferRequest.getConnectorParams();
+            Connector connector = httpTransferRequest.getOtherConnector();
+            MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
 
-        // TODO Load from HTTP Headers
-        AuthToken authToken = httpTransferRequest.getAuthToken();
+            ConnectorParams params = httpTransferRequest.getConnectorParams();
 
-        connector.init(params.getResourceServiceHost(),
-                params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
+            // TODO Load from HTTP Headers
+            AuthToken authToken = httpTransferRequest.getAuthToken();
 
-        metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
-                params.getSecretServiceHost(), params.getSecretServicePort());
+            connector.init(params.getResourceServiceHost(),
+                    params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
 
-        Boolean available = metadataCollector.isAvailable(authToken,
-                httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
+            metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
+                    params.getSecretServiceHost(), params.getSecretServicePort());
 
+            Boolean available = metadataCollector.isAvailable(authToken,
+                    httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
 
-        if (!available) {
-            logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
-            sendError(ctx, NOT_FOUND);
-            return;
-        }
 
-        FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
-                httpTransferRequest.getResourceId(),
-                httpTransferRequest.getChildResourcePath(),
-                httpTransferRequest.getCredentialToken());
+            if (!available) {
+                logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
+                sendError(ctx, NOT_FOUND);
+                return;
+            }
 
-        long fileLength = fileResourceMetadata.getResourceSize();
+            FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
+                    httpTransferRequest.getResourceId(),
+                    httpTransferRequest.getChildResourcePath(),
+                    httpTransferRequest.getCredentialToken());
 
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-        HttpUtil.setContentLength(response, fileLength);
-        setContentTypeHeader(response, httpTransferRequest.getResourceId());
+            long fileLength = fileResourceMetadata.getResourceSize();
 
-        if (HttpUtil.isKeepAlive(request)) {
-            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
-        }
+            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+            HttpUtil.setContentLength(response, fileLength);
+            setContentTypeHeader(response, httpTransferRequest.getResourceId());
+
+            if (HttpUtil.isKeepAlive(request)) {
+                response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+            }
 
-        // Write the initial line and the header.
-        ctx.write(response);
+            // Write the initial line and the header.
+            ctx.write(response);
 
-        // Write the content.
-        ChannelFuture sendFileFuture;
-        ChannelFuture lastContentFuture;
+            // Write the content.
+            ChannelFuture sendFileFuture;
+            ChannelFuture lastContentFuture;
 
-        ConnectorContext connectorContext = new ConnectorContext();
-        connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
-        connectorContext.setTransferId(uri);
-        connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
+            ConnectorContext connectorContext = new ConnectorContext();
+            connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
+            connectorContext.setTransferId(uri);
+            connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
 
-        TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
-                httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
-                connectorContext, connector);
+            TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
+                    httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
+                    connectorContext, connector);
 
-        // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
-        Future<Integer> pullStatusFuture = executor.submit(pullTask);
+            // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
+            Future<Integer> pullStatusFuture = executor.submit(pullTask);
 
-        sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
-                ctx.newProgressivePromise());
+            sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
+                    ctx.newProgressivePromise());
 
-        // HttpChunkedInput will write the end marker (LastHttpContent) for us.
-        lastContentFuture = sendFileFuture;
+            // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+            lastContentFuture = sendFileFuture;
 
-        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
-            @Override
-            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
-                if (total < 0) { // total unknown
-                    logger.error(future.channel() + " Transfer progress: " + progress);
-                } else {
-                    logger.error(future.channel() + " Transfer progress: " + progress + " / " + total);
+            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
+                @Override
+                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
+                    if (total < 0) { // total unknown
+                        logger.error(future.channel() + " Transfer progress: " + progress);
+                    } else {
+                        logger.error(future.channel() + " Transfer progress: " + progress + " / " + total);
+                    }
                 }
-            }
 
-            @Override
-            public void operationComplete(ChannelProgressiveFuture future) {
-                System.err.println(future.channel() + " Transfer complete.");
+                @Override
+                public void operationComplete(ChannelProgressiveFuture future) {
+                    System.err.println(future.channel() + " Transfer complete.");
+                }
+            });
+
+            // Decide whether to close the connection or not.
+            if (!HttpUtil.isKeepAlive(request)) {
+                // Close the connection when the whole content is written out.
+                lastContentFuture.addListener(ChannelFutureListener.CLOSE);
             }
-        });
 
-        // Decide whether to close the connection or not.
-        if (!HttpUtil.isKeepAlive(request)) {
-            // Close the connection when the whole content is written out.
-            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+        } catch (Exception e) {
+            logger.error("Errored while processing HTTP download request", e);
+            sendError(ctx, INTERNAL_SERVER_ERROR);
         }
     }