You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/10/23 03:32:45 UTC
[airavata-mft] branch develop updated: Simplifying the streaming
logic to perform transfer using a single thread
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/develop by this push:
new 490a953 Simplifying the streaming logic to perform transfer using a single thread
490a953 is described below
commit 490a953256ea411efc7e0ae5f91aa7f15cf71bba
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Oct 22 23:32:36 2021 -0400
Simplifying the streaming logic to perform transfer using a single thread
---
.../org/apache/airavata/mft/agent/MFTAgent.java | 49 ++--
.../airavata/mft/agent/TransportMediator.java | 254 +++++++--------------
.../mft/agent/http/AgentHttpDownloadData.java | 105 +++++++++
.../airavata/mft/agent/http/ConnectorParams.java | 60 -----
.../mft/agent/http/HttpDownloadRequest.java | 75 ------
.../airavata/mft/agent/http/HttpServerHandler.java | 62 +----
.../mft/agent/http/HttpTransferRequest.java | 105 ---------
.../mft/agent/http/HttpTransferRequestsStore.java | 16 +-
.../apache/airavata/mft/agent/rpc/RPCParser.java | 44 ++--
agent/src/main/resources/application.properties | 3 +-
.../airavata/mft/core/ConnectorResolver.java | 103 ++-------
.../airavata/mft/core/FileResourceMetadata.java | 2 +-
.../org/apache/airavata/mft/core/TransferTask.java | 53 -----
.../airavata/mft/core/api/ConnectorConfig.java | 167 ++++++++++++++
.../airavata/mft/core/api/IncomingConnector.java | 27 +++
.../airavata/mft/core/api/OutgoingConnector.java | 27 +++
scripts/build.sh | 3 +-
.../src/main/resources/applicationContext.xml | 2 +-
.../mft/transport/scp/LimitInputStream.java | 94 ++++++++
.../mft/transport/scp/SCPIncomingConnector.java | 229 +++++++++++++++++++
.../mft/transport/scp/SCPMetadataCollector.java | 2 +-
.../mft/transport/scp/SCPOutgoingConnector.java | 223 ++++++++++++++++++
22 files changed, 1067 insertions(+), 638 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 5ccf25a..bec7945 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -35,9 +35,9 @@ import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.api.service.CallbackEndpoint;
import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.core.ConnectorResolver;
+import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
@@ -172,14 +172,6 @@ public class MFTAgent implements CommandLineRunner {
.setPublisher(agentId)
.setDescription("Starting the transfer"));
- Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
- Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
- inConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
- Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
- Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
- outConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
@@ -188,6 +180,34 @@ public class MFTAgent implements CommandLineRunner {
MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+ FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
+ request.getMftAuthorizationToken(),
+ request.getSourceResourceId(),
+ request.getSourceToken());
+
+
+ ConnectorConfig srcCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withAuthToken(request.getMftAuthorizationToken())
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withTransferId(transferId)
+ .withResourceId(request.getSourceResourceId())
+ .withCredentialToken(request.getSourceToken())
+ .withMetadata(srcMetadata).build();
+
+ ConnectorConfig dstCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withAuthToken(request.getMftAuthorizationToken())
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withTransferId(transferId)
+ .withResourceId(request.getDestinationResourceId())
+ .withCredentialToken(request.getDestinationToken())
+ .withMetadata(srcMetadata).build();
+
mftConsulClient.submitTransferStateToProcess(transferId, agentId, new TransferState()
.setState("STARTED")
.setPercentage(0)
@@ -195,13 +215,11 @@ public class MFTAgent implements CommandLineRunner {
.setPublisher(agentId)
.setDescription("Started the transfer"));
-
- TransferApiRequest finalRequest = request;
- mediator.transfer(transferId, request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
+ mediator.transferSingleThread(transferId, request, srcCC, dstCC,
(id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
- handleCallbacks(finalRequest.getCallbackEndpointsList(), id, st);
+
} catch (MFTConsulClientException e) {
logger.error("Failed while updating transfer state", e);
}
@@ -213,8 +231,7 @@ public class MFTAgent implements CommandLineRunner {
} catch (Exception e) {
logger.error("Failed while deleting scheduled path for transfer {}", id);
}
- }
- );
+ });
logger.info("Started the transfer " + transferId);
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index b9195b8..6822275 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -20,17 +20,17 @@ package org.apache.airavata.mft.agent;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
public class TransportMediator {
@@ -40,197 +40,113 @@ public class TransportMediator {
/*
Number of maximum transfers handled at atime
*/
- private int concurrentTransfers = 10;
- private ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
- private ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
+ private final int concurrentTransfers = 10;
+ private final ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
+ private final ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
- public void destroy() {
- executor.shutdown();
- }
-
- public void transfer(String transferId, TransferApiRequest request, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
- MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
- BiConsumer<String, Boolean> exitingCallback) throws Exception {
+ public void transferSingleThread(String transferId,
+ TransferApiRequest request,
+ ConnectorConfig srcCC,
+ ConnectorConfig dstCC,
+ BiConsumer<String, TransferState> onStatusCallback,
+ BiConsumer<String, Boolean> exitingCallback) {
- FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
- request.getMftAuthorizationToken(),
- request.getSourceResourceId(),
- request.getSourceToken());
+ executor.submit(() -> {
- final long resourceSize = srcMetadata.getResourceSize();
- logger.debug("Source file size {}. MD5 {}", resourceSize, srcMetadata.getMd5sum());
+ final AtomicBoolean transferInProgress = new AtomicBoolean(true);
- final DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
- final ReentrantLock statusLock = new ReentrantLock();
+ try {
- ConnectorContext context = new ConnectorContext();
- context.setMetadata(srcMetadata);
- context.setStreamBuffer(streamBuffer);
- context.setTransferId(transferId);
+ long start = System.currentTimeMillis();
- TransferTask recvTask = new TransferTask(request.getMftAuthorizationToken(), request.getSourceResourceId(),
- request.getSourceChildResourcePath(), request.getSourceToken(), context, inConnector);
- TransferTask sendTask = new TransferTask(request.getMftAuthorizationToken(), request.getDestinationResourceId(),
- request.getDestinationChildResourcePath(), request.getDestinationToken(), context, outConnector);
- List<Future<Integer>> futureList = new ArrayList<>();
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(100)
+ .setState("RUNNING")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer successfully completed"));
- ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
+ Optional<IncomingConnector> inConnectorOp = ConnectorResolver.resolveIncomingConnector(request.getSourceType());
+ Optional<OutgoingConnector> outConnectorOp = ConnectorResolver.resolveOutgoingConnector(request.getDestinationType());
- long startTime = System.nanoTime();
+ IncomingConnector inConnector = inConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an in connector for type " + request.getSourceType()));
- futureList.add(completionService.submit(recvTask));
- futureList.add(completionService.submit(sendTask));
+ OutgoingConnector outConnector = outConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an out connector for type " + request.getDestinationType()));
- final AtomicBoolean transferInProgress = new AtomicBoolean(true);
- final AtomicBoolean transferSuccess = new AtomicBoolean(true);
+ inConnector.init(srcCC);
+ outConnector.init(dstCC);
+ String srcChild = request.getSourceChildResourcePath();
+ String dstChild = request.getDestinationChildResourcePath();
- // Monitoring the completeness of the transfer
- Thread monitorThread = new Thread(new Runnable() {
- @Override
- public void run() {
+ InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+ OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
- try {
- int futureCnt = futureList.size();
- boolean transferErrored = false;
+ long count = 0;
+ final AtomicLong countAtomic = new AtomicLong();
+ countAtomic.set(count);
- for (int i = 0; i < futureCnt; i++) {
- Future<Integer> ft = completionService.take();
- futureList.remove(ft);
+ monitorPool.submit(() -> {
+ while (true) {
try {
- ft.get();
- } catch (Exception e) {
-
- logger.error("One task failed with error", e);
- transferErrored = true;
- statusLock.lock();
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(0)
- .setState("FAILED")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
- transferInProgress.set(false);
- transferSuccess.set(false);
- statusLock.unlock();
-
- for (Future<Integer> f : futureList) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- logger.error("Sleep failed", e);
- }
- f.cancel(true);
- }
- futureList.clear();
- }
- }
-
- if (!transferErrored) {
- Boolean transferred = destMetadataCollector.isAvailable(
- request.getMftAuthorizationToken(),
- request.getDestinationResourceId(),
- request.getDestinationToken());
-
-
- if (!transferred) {
- logger.error("Transfer completed but resource is not available in destination");
- throw new Exception("Transfer completed but resource is not available in destination");
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // Ignore
}
-
- FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
- request.getMftAuthorizationToken(),
- request.getDestinationResourceId(),
- request.getDestinationToken());
-
-
- boolean doIntegrityVerify = true;
-
- if (srcMetadata.getMd5sum() == null) {
- logger.warn("MD5 sum is not available for source resource. So this disables integrity verification");
- doIntegrityVerify = false;
- } else if (destMetadata.getMd5sum() == null) {
- logger.warn("MD5 sum is not available for destination resource. So this disables integrity verification");
- doIntegrityVerify = false;
+ if (!transferInProgress.get()) {
+ logger.info("Status monitor is exiting for transfer {}", transferId);
+ break;
}
+ double transferPercentage = countAtomic.get() * 100.0/ srcCC.getMetadata().getResourceSize();
+ logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(transferPercentage)
+ .setState("RUNNING")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer Progress Updated"));
+ }
+ });
- if (doIntegrityVerify && !destMetadata.getMd5sum().equals(srcMetadata.getMd5sum())) {
- logger.error("Resource integrity violated. MD5 sums are not matching. Source md5 {} destination md5 {}",
- srcMetadata.getMd5sum(), destMetadata.getMd5sum());
- throw new Exception("Resource integrity violated. MD5 sums are not matching. Source md5 " + srcMetadata.getMd5sum()
- + " destination md5 " + destMetadata.getMd5sum());
- }
+ int n;
+ byte[] buffer = new byte[128 * 1024];
+ for(count = 0L; -1 != (n = inputStream.read(buffer)); count += (long)n) {
+ outputStream.write(buffer, 0, n);
+ countAtomic.set(count);
+ }
- // Check
+ inConnector.complete();
+ outConnector.complete();
- long endTime = System.nanoTime();
+ long time = (System.currentTimeMillis() - start) / 1000;
- double time = (endTime - startTime) * 1.0 / 1000000000;
+ logger.info("Transfer {} completed. Time {} S. Speed {} MB/s", transferId, time,
+ (srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));
- statusLock.lock();
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(100)
- .setState("COMPLETED")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer successfully completed"));
- transferInProgress.set(false);
- transferSuccess.set(true);
- statusLock.unlock();
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(100)
+ .setState("COMPLETED")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer successfully completed"));
- logger.info("Transfer {} completed. Speed {} MB/s", transferId,
- (srcMetadata.getResourceSize() * 1.0 / time) / (1024 * 1024));
- }
- } catch (Exception e) {
-
- statusLock.lock();
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(0)
- .setState("FAILED")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
- transferInProgress.set(false);
- transferSuccess.set(false);
- statusLock.unlock();
-
- logger.error("Transfer {} failed", transferId, e);
- } finally {
- inConnector.destroy();
- outConnector.destroy();
- transferInProgress.set(false);
- exitingCallback.accept(transferId,transferSuccess.get());
- }
- }
- });
+ exitingCallback.accept(transferId, true);
+ } catch (Exception e) {
- // Monitoring the status of the transfer
- Thread progressThread = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
+ logger.error("Transfer {} failed with error", transferId, e);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // Ignore
- }
- statusLock.lock();
- if (!transferInProgress.get()){
- statusLock.unlock();
- logger.info("Status monitor is exiting for transfer {}", transferId);
- break;
- }
- double transferPercentage = streamBuffer.getProcessedBytes() * 100.0/ resourceSize;
- logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(transferPercentage)
- .setState("RUNNING")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer Progress Updated"));
- statusLock.unlock();
- }
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(0)
+ .setState("FAILED")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+ } finally {
+ transferInProgress.set(false);
}
});
+ }
- monitorPool.submit(monitorThread);
- monitorPool.submit(progressThread);
+ public void destroy() {
+ executor.shutdown();
+ monitorPool.shutdown();
}
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
new file mode 100644
index 0000000..1b8930c
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingConnector;
+
+public class AgentHttpDownloadData {
+ private IncomingConnector incomingConnector;
+ private ConnectorConfig connectorConfig;
+ private String childResourcePath;
+ private long createdTime = System.currentTimeMillis();
+
+ public IncomingConnector getIncomingConnector() {
+ return incomingConnector;
+ }
+
+ public void setIncomingConnector(IncomingConnector incomingConnector) {
+ this.incomingConnector = incomingConnector;
+ }
+
+ public ConnectorConfig getConnectorConfig() {
+ return connectorConfig;
+ }
+
+ public void setConnectorConfig(ConnectorConfig connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ }
+
+ public String getChildResourcePath() {
+ return childResourcePath;
+ }
+
+ public void setChildResourcePath(String childResourcePath) {
+ this.childResourcePath = childResourcePath;
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ }
+
+
+ public static final class AgentHttpDownloadDataBuilder {
+ private IncomingConnector incomingConnector;
+ private ConnectorConfig connectorConfig;
+ private String childResourcePath;
+ private long createdTime = System.currentTimeMillis();
+
+ private AgentHttpDownloadDataBuilder() {
+ }
+
+ public static AgentHttpDownloadDataBuilder newBuilder() {
+ return new AgentHttpDownloadDataBuilder();
+ }
+
+ public AgentHttpDownloadDataBuilder withIncomingConnector(IncomingConnector incomingConnector) {
+ this.incomingConnector = incomingConnector;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withChildResourcePath(String childResourcePath) {
+ this.childResourcePath = childResourcePath;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ return this;
+ }
+
+
+ public AgentHttpDownloadData build() {
+ AgentHttpDownloadData agentHttpDownloadData = new AgentHttpDownloadData();
+ agentHttpDownloadData.setIncomingConnector(incomingConnector);
+ agentHttpDownloadData.setConnectorConfig(connectorConfig);
+ agentHttpDownloadData.setChildResourcePath(childResourcePath);
+ agentHttpDownloadData.setCreatedTime(createdTime);
+ return agentHttpDownloadData;
+ }
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
deleted file mode 100644
index 99a4b38..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-public class ConnectorParams {
-
- private String resourceServiceHost, secretServiceHost;
- private int resourceServicePort, secretServicePort;
-
- public String getResourceServiceHost() {
- return resourceServiceHost;
- }
-
- public ConnectorParams setResourceServiceHost(String resourceServiceHost) {
- this.resourceServiceHost = resourceServiceHost;
- return this;
- }
-
- public String getSecretServiceHost() {
- return secretServiceHost;
- }
-
- public ConnectorParams setSecretServiceHost(String secretServiceHost) {
- this.secretServiceHost = secretServiceHost;
- return this;
- }
-
- public int getResourceServicePort() {
- return resourceServicePort;
- }
-
- public ConnectorParams setResourceServicePort(int resourceServicePort) {
- this.resourceServicePort = resourceServicePort;
- return this;
- }
-
- public int getSecretServicePort() {
- return secretServicePort;
- }
-
- public ConnectorParams setSecretServicePort(int secretServicePort) {
- this.secretServicePort = secretServicePort;
- return this;
- }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
deleted file mode 100644
index 2cf56af..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpDownloadRequest {
-
- private ConnectorParams connectorParams;
- private Connector srcConnector;
- private MetadataCollector srcMetadataCollector;
- private String srcResourceId;
- private String srcToken;
-
- public ConnectorParams getConnectorParams() {
- return connectorParams;
- }
-
- public HttpDownloadRequest setConnectorParams(ConnectorParams connectorParams) {
- this.connectorParams = connectorParams;
- return this;
- }
-
- public Connector getSrcConnector() {
- return srcConnector;
- }
-
- public HttpDownloadRequest setSrcConnector(Connector srcConnector) {
- this.srcConnector = srcConnector;
- return this;
- }
-
- public MetadataCollector getSrcMetadataCollector() {
- return srcMetadataCollector;
- }
-
- public HttpDownloadRequest setSrcMetadataCollector(MetadataCollector srcMetadataCollector) {
- this.srcMetadataCollector = srcMetadataCollector;
- return this;
- }
-
- public String getSrcResourceId() {
- return srcResourceId;
- }
-
- public HttpDownloadRequest setSrcResourceId(String srcResourceId) {
- this.srcResourceId = srcResourceId;
- return this;
- }
-
- public String getSrcToken() {
- return srcToken;
- }
-
- public HttpDownloadRequest setSrcToken(String srcToken) {
- this.srcToken = srcToken;
- return this;
- }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index f03a952..87153b4 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -25,14 +25,12 @@ import io.netty.util.CharsetUtil;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.IncomingConnector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.activation.MimetypesFileTypeMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.io.InputStream;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
@@ -43,7 +41,6 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
private final HttpTransferRequestsStore transferRequestsStore;
- private final ExecutorService executor = Executors.newFixedThreadPool(10);
public HttpServerHandler(HttpTransferRequestsStore transferRequestsStore) {
this.transferRequestsStore = transferRequestsStore;
@@ -66,48 +63,19 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
final String uri = request.uri().substring(request.uri().lastIndexOf("/") + 1);
logger.info("Received download request through url {}", uri);
- HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+ AgentHttpDownloadData downloadData = transferRequestsStore.getDownloadRequest(uri);
- if (httpTransferRequest == null) {
+ if (downloadData == null) {
logger.error("Couldn't find transfer request for uri {}", uri);
sendError(ctx, NOT_FOUND);
return;
}
- Connector connector = httpTransferRequest.getOtherConnector();
- MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
-
- ConnectorParams params = httpTransferRequest.getConnectorParams();
-
- // TODO Load from HTTP Headers
- AuthToken authToken = httpTransferRequest.getAuthToken();
-
- connector.init(params.getResourceServiceHost(),
- params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
-
- metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
- params.getSecretServiceHost(), params.getSecretServicePort());
-
- Boolean available = metadataCollector.isAvailable(authToken,
- httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
-
-
- if (!available) {
- logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
- sendError(ctx, NOT_FOUND);
- return;
- }
-
- FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
- httpTransferRequest.getResourceId(),
- httpTransferRequest.getChildResourcePath(),
- httpTransferRequest.getCredentialToken());
-
- long fileLength = fileResourceMetadata.getResourceSize();
+ long fileLength = downloadData.getConnectorConfig().getMetadata().getResourceSize();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
- setContentTypeHeader(response, fileResourceMetadata.getFriendlyName());
+ setContentTypeHeader(response, downloadData.getConnectorConfig().getMetadata().getFriendlyName());
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
@@ -120,19 +88,13 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture;
- ConnectorContext connectorContext = new ConnectorContext();
- connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
- connectorContext.setTransferId(uri);
- connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
-
- TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
- httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
- connectorContext, connector);
-
- // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
- Future<Integer> pullStatusFuture = executor.submit(pullTask);
+ IncomingConnector incomingConnector = downloadData.getIncomingConnector();
+ incomingConnector.init(downloadData.getConnectorConfig());
+ InputStream inputStream = downloadData.getChildResourcePath().equals("")?
+ incomingConnector.fetchInputStream() :
+ incomingConnector.fetchInputStream(downloadData.getChildResourcePath());
- sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
+ sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
ctx.newProgressivePromise());
// HttpChunkedInput will write the end marker (LastHttpContent) for us.
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
deleted file mode 100644
index c6d5192..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpTransferRequest {
- private Connector otherConnector;
- private MetadataCollector otherMetadataCollector;
- private ConnectorParams connectorParams;
- private String resourceId;
- private String childResourcePath;
- private String credentialToken;
- private long createdTime = System.currentTimeMillis();
- private AuthToken authToken;
-
- public Connector getOtherConnector() {
- return otherConnector;
- }
-
- public HttpTransferRequest setOtherConnector(Connector otherConnector) {
- this.otherConnector = otherConnector;
- return this;
- }
-
- public MetadataCollector getOtherMetadataCollector() {
- return otherMetadataCollector;
- }
-
- public HttpTransferRequest setOtherMetadataCollector(MetadataCollector otherMetadataCollector) {
- this.otherMetadataCollector = otherMetadataCollector;
- return this;
- }
-
- public String getResourceId() {
- return resourceId;
- }
-
- public HttpTransferRequest setResourceId(String resourceId) {
- this.resourceId = resourceId;
- return this;
- }
-
- public String getChildResourcePath() {
- return childResourcePath;
- }
-
- public HttpTransferRequest setChildResourcePath(String childResourcePath) {
- this.childResourcePath = childResourcePath;
- return this;
- }
-
- public String getCredentialToken() {
- return credentialToken;
- }
-
- public HttpTransferRequest setCredentialToken(String credentialToken) {
- this.credentialToken = credentialToken;
- return this;
- }
-
- public ConnectorParams getConnectorParams() {
- return connectorParams;
- }
-
- public HttpTransferRequest setConnectorParams(ConnectorParams connectorParams) {
- this.connectorParams = connectorParams;
- return this;
- }
-
- public long getCreatedTime() {
- return createdTime;
- }
-
- public HttpTransferRequest setCreatedTime(long createdTime) {
- this.createdTime = createdTime;
- return this;
- }
-
- public AuthToken getAuthToken() {
- return authToken;
- }
-
- public HttpTransferRequest setAuthToken(AuthToken authToken) {
- this.authToken = authToken;
- return this;
- }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
index dd2f17e..56c468b 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
@@ -31,8 +31,8 @@ public class HttpTransferRequestsStore {
private static final Logger logger = LoggerFactory.getLogger(HttpTransferRequestsStore.class);
- final private Map<String, HttpTransferRequest> downloadRequestStore = new ConcurrentHashMap<>();
- final private Map<String, HttpTransferRequest> uploadRequestStore = new ConcurrentHashMap<>();
+ final private Map<String, AgentHttpDownloadData> downloadRequestStore = new ConcurrentHashMap<>();
+ final private Map<String, AgentHttpDownloadData> uploadRequestStore = new ConcurrentHashMap<>();
final private ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
private long entryExpiryTimeMS = 300 * 1000;
@@ -56,32 +56,32 @@ public class HttpTransferRequestsStore {
}, 2, 10, TimeUnit.SECONDS);
}
- public String addDownloadRequest(HttpTransferRequest request) {
+ public String addDownloadRequest(AgentHttpDownloadData request) {
String randomUrl = UUID.randomUUID().toString();
downloadRequestStore.put(randomUrl, request);
return randomUrl;
}
- public HttpTransferRequest getDownloadRequest(String url) {
+ public AgentHttpDownloadData getDownloadRequest(String url) {
//TODO Need to block concurrent calls to same url as connectors are not thread safe
- HttpTransferRequest request = downloadRequestStore.get(url);
+ AgentHttpDownloadData request = downloadRequestStore.get(url);
if (request != null) {
downloadRequestStore.remove(url);
}
return request;
}
- public String addUploadRequest(HttpTransferRequest request) {
+ public String addUploadRequest(AgentHttpDownloadData request) {
String randomUrl = UUID.randomUUID().toString();
uploadRequestStore.put(randomUrl, request);
return randomUrl;
}
- public HttpTransferRequest getUploadRequest(String url) {
+ public AgentHttpDownloadData getUploadRequest(String url) {
//TODO Need to block concurrent calls to same url as connectors are not thread safe
- HttpTransferRequest request = uploadRequestStore.get(url);
+ AgentHttpDownloadData request = uploadRequestStore.get(url);
if (request != null) {
uploadRequestStore.remove(url);
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index f5c47e4..8e1e155 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -21,8 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.util.JsonFormat;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
-import org.apache.airavata.mft.agent.http.ConnectorParams;
-import org.apache.airavata.mft.agent.http.HttpTransferRequest;
+import org.apache.airavata.mft.agent.http.AgentHttpDownloadData;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.core.ConnectorResolver;
@@ -30,6 +29,8 @@ import org.apache.airavata.mft.core.DirectoryResourceMetadata;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingConnector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,22 +155,33 @@ public class RPCParser {
mftAuthorizationToken = tokenBuilder.build();
metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
- Optional<Connector> connectorOp = ConnectorResolver.resolveConnector(storeType, "IN");
+ Optional<IncomingConnector> connectorOp = ConnectorResolver.resolveIncomingConnector(storeType);
if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
- HttpTransferRequest transferRequest = new HttpTransferRequest()
- .setConnectorParams(new ConnectorParams()
- .setResourceServiceHost(resourceServiceHost)
- .setResourceServicePort(resourceServicePort)
- .setSecretServiceHost(secretServiceHost)
- .setSecretServicePort(secretServicePort))
- .setResourceId(resourceId)
- .setChildResourcePath(childResourcePath)
- .setCredentialToken(sourceToken)
- .setOtherMetadataCollector(metadataCollectorOp.get())
- .setOtherConnector(connectorOp.get())
- .setAuthToken(mftAuthorizationToken);
- String url = httpTransferRequestsStore.addDownloadRequest(transferRequest);
+
+ MetadataCollector metadataCollector = metadataCollectorOp.get();
+ metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+
+ FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(
+ mftAuthorizationToken,
+ resourceId,
+ childResourcePath,
+ sourceToken);
+
+ AgentHttpDownloadData downloadData = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
+ .withChildResourcePath(childResourcePath)
+ .withIncomingConnector(connectorOp.get())
+ .withConnectorConfig(ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withResourceId(resourceId)
+ .withCredentialToken(sourceToken)
+ .withAuthToken(mftAuthorizationToken)
+ .withMetadata(fileResourceMetadata).build()).build();
+
+ String url = httpTransferRequestsStore.addDownloadRequest(downloadData);
return (agentAdvertisedUrl.endsWith("/")? agentAdvertisedUrl : agentAdvertisedUrl + "/") + url;
} else {
diff --git a/agent/src/main/resources/application.properties b/agent/src/main/resources/application.properties
index d5949da..4d8d87c 100644
--- a/agent/src/main/resources/application.properties
+++ b/agent/src/main/resources/application.properties
@@ -31,4 +31,5 @@ consul.port=8500
resource.service.host=localhost
resource.service.port=7002
secret.service.host=localhost
-secret.service.port=7003
\ No newline at end of file
+secret.service.port=7003
+agent.advertised.url=http://localhost:3333
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index 469d19e..e6b5ef1 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -17,101 +17,42 @@
package org.apache.airavata.mft.core;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.OutgoingConnector;
-import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
public final class ConnectorResolver {
- public static Optional<Connector> resolveConnector(String type, String direction) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+
+ public static Optional<IncomingConnector> resolveIncomingConnector(String type) throws Exception {
String className = null;
switch (type) {
case "SCP":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.scp.SCPReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.scp.SCPSender";
- break;
- }
- break;
- case "LOCAL":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.local.LocalReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.local.LocalSender";
- break;
- }
- break;
- case "S3":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.s3.S3Receiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.s3.S3Sender";
- break;
- }
- break;
- case "BOX":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.box.BoxReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.box.BoxSender";
- break;
- }
- break;
- case "AZURE":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.azure.AzureReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.azure.AzureSender";
- break;
- }
+ className = "org.apache.airavata.mft.transport.scp.SCPIncomingConnector";
break;
- case "GCS":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.gcp.GCSReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.gcp.GCSSender";
- break;
- }
- break;
- case "DROPBOX":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.dropbox.DropboxReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.dropbox.DropboxSender";
- break;
- }
- break;
- case "FTP":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.ftp.FTPReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.ftp.FTPSender";
- break;
- }
+ }
+
+ if (className != null) {
+ Class<?> aClass = Class.forName(className);
+ return Optional.of((IncomingConnector) aClass.getDeclaredConstructor().newInstance());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static Optional<OutgoingConnector> resolveOutgoingConnector(String type) throws Exception {
+
+ String className = null;
+ switch (type) {
+ case "SCP":
+ className = "org.apache.airavata.mft.transport.scp.SCPOutgoingConnector";
break;
}
if (className != null) {
Class<?> aClass = Class.forName(className);
- return Optional.of((Connector) aClass.getDeclaredConstructor().newInstance());
+ return Optional.of((OutgoingConnector) aClass.getDeclaredConstructor().newInstance());
} else {
return Optional.empty();
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
index d61d743..7033d02 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
@@ -91,7 +91,7 @@ public class FileResourceMetadata {
private Builder() {
}
- public static Builder getBuilder() {
+ public static Builder newBuilder() {
return new Builder();
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
deleted file mode 100644
index 42a0e08..0000000
--- a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.core;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-
-import java.util.concurrent.Callable;
-
-public class TransferTask implements Callable<Integer> {
-
- private Connector connector;
- private ConnectorContext context;
- private String resourceId;
- private String childResourcePath;
- private String credentialToken;
- private AuthToken authToken;
-
- public TransferTask(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
- ConnectorContext context, Connector connector) {
- this.connector = connector;
- this.context = context;
- this.resourceId = resourceId;
- this.authToken = authToken;
- this.credentialToken = credentialToken;
- this.childResourcePath = childResourcePath;
- }
-
- @Override
- public Integer call() throws Exception {
- if (childResourcePath == null || "".equals(childResourcePath)) {
- this.connector.startStream(authToken, resourceId, credentialToken, context);
- } else {
- this.connector.startStream(authToken, resourceId, childResourcePath, credentialToken, context);
- }
- return 0;
- }
-}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
new file mode 100644
index 0000000..bc754e8
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
@@ -0,0 +1,167 @@
+package org.apache.airavata.mft.core.api;
+
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+
+public class ConnectorConfig {
+ private String resourceServiceHost;
+ private int resourceServicePort;
+ private String secretServiceHost;
+ private int secretServicePort;
+ private String resourceId;
+ private String credentialToken;
+ private AuthToken authToken;
+ private String transferId;
+ private FileResourceMetadata metadata;
+
+ public String getResourceServiceHost() {
+ return resourceServiceHost;
+ }
+
+ public void setResourceServiceHost(String resourceServiceHost) {
+ this.resourceServiceHost = resourceServiceHost;
+ }
+
+ public int getResourceServicePort() {
+ return resourceServicePort;
+ }
+
+ public void setResourceServicePort(int resourceServicePort) {
+ this.resourceServicePort = resourceServicePort;
+ }
+
+ public String getSecretServiceHost() {
+ return secretServiceHost;
+ }
+
+ public void setSecretServiceHost(String secretServiceHost) {
+ this.secretServiceHost = secretServiceHost;
+ }
+
+ public int getSecretServicePort() {
+ return secretServicePort;
+ }
+
+ public void setSecretServicePort(int secretServicePort) {
+ this.secretServicePort = secretServicePort;
+ }
+
+ public String getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(String resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public String getCredentialToken() {
+ return credentialToken;
+ }
+
+ public void setCredentialToken(String credentialToken) {
+ this.credentialToken = credentialToken;
+ }
+
+ public AuthToken getAuthToken() {
+ return authToken;
+ }
+
+ public void setAuthToken(AuthToken authToken) {
+ this.authToken = authToken;
+ }
+
+ public String getTransferId() {
+ return transferId;
+ }
+
+ public void setTransferId(String transferId) {
+ this.transferId = transferId;
+ }
+
+ public FileResourceMetadata getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(FileResourceMetadata metadata) {
+ this.metadata = metadata;
+ }
+
+
+ public static final class ConnectorConfigBuilder {
+ private String resourceServiceHost;
+ private int resourceServicePort;
+ private String secretServiceHost;
+ private int secretServicePort;
+ private String resourceId;
+ private String credentialToken;
+ private AuthToken authToken;
+ private String transferId;
+ private FileResourceMetadata metadata;
+
+ private ConnectorConfigBuilder() {
+ }
+
+ public static ConnectorConfigBuilder newBuilder() {
+ return new ConnectorConfigBuilder();
+ }
+
+ public ConnectorConfigBuilder withResourceServiceHost(String resourceServiceHost) {
+ this.resourceServiceHost = resourceServiceHost;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withResourceServicePort(int resourceServicePort) {
+ this.resourceServicePort = resourceServicePort;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withSecretServiceHost(String secretServiceHost) {
+ this.secretServiceHost = secretServiceHost;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withSecretServicePort(int secretServicePort) {
+ this.secretServicePort = secretServicePort;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withResourceId(String resourceId) {
+ this.resourceId = resourceId;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withCredentialToken(String credentialToken) {
+ this.credentialToken = credentialToken;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withAuthToken(AuthToken authToken) {
+ this.authToken = authToken;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withTransferId(String transferId) {
+ this.transferId = transferId;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withMetadata(FileResourceMetadata metadata) {
+ this.metadata = metadata;
+ return this;
+ }
+
+ public ConnectorConfig build() {
+ ConnectorConfig connectorConfig = new ConnectorConfig();
+ connectorConfig.setResourceServiceHost(resourceServiceHost);
+ connectorConfig.setResourceServicePort(resourceServicePort);
+ connectorConfig.setSecretServiceHost(secretServiceHost);
+ connectorConfig.setSecretServicePort(secretServicePort);
+ connectorConfig.setResourceId(resourceId);
+ connectorConfig.setCredentialToken(credentialToken);
+ connectorConfig.setAuthToken(authToken);
+ connectorConfig.setTransferId(transferId);
+ connectorConfig.setMetadata(metadata);
+ return connectorConfig;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
new file mode 100644
index 0000000..b805bf8
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.InputStream;
+
+public interface IncomingConnector {
+ public void init(ConnectorConfig connectorConfig) throws Exception;
+ public InputStream fetchInputStream() throws Exception;
+ public InputStream fetchInputStream(String childPath) throws Exception;
+ public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
new file mode 100644
index 0000000..9ab495c
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.OutputStream;
+
+public interface OutgoingConnector {
+ public void init(ConnectorConfig connectorConfig) throws Exception;
+ public OutputStream fetchOutputStream() throws Exception;
+ public OutputStream fetchOutputStream(String childPath) throws Exception;
+ public void complete() throws Exception;
+}
diff --git a/scripts/build.sh b/scripts/build.sh
index 5bb68ab..2d5e4af 100755
--- a/scripts/build.sh
+++ b/scripts/build.sh
@@ -19,6 +19,7 @@
cd ../
mvn clean install
+rm -rf build
mkdir -p build
cp agent/target/MFT-Agent-0.01-bin.zip build/
cp controller/target/MFT-Controller-0.01-bin.zip build/
@@ -30,4 +31,4 @@ unzip -o build/MFT-Agent-0.01-bin.zip -d build/
unzip -o build/MFT-Controller-0.01-bin.zip -d build/
unzip -o build/Resource-Service-0.01-bin.zip -d build/
unzip -o build/Secret-Service-0.01-bin.zip -d build/
-unzip -o build/API-Service-0.01-bin.zip -d build/
\ No newline at end of file
+unzip -o build/API-Service-0.01-bin.zip -d build/
diff --git a/services/resource-service/server/src/main/resources/applicationContext.xml b/services/resource-service/server/src/main/resources/applicationContext.xml
index 2fbd7eb..ea845a3 100644
--- a/services/resource-service/server/src/main/resources/applicationContext.xml
+++ b/services/resource-service/server/src/main/resources/applicationContext.xml
@@ -6,7 +6,7 @@
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
- <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.datalake.DatalakeResourceBackend"
+ <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.file.FileBasedResourceBackend"
init-method="init" destroy-method="destroy"></bean>
</beans>
\ No newline at end of file
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
new file mode 100644
index 0000000..806f7c2
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LimitInputStream extends FilterInputStream implements Channel {
+ private final AtomicBoolean open = new AtomicBoolean(true);
+ private long remaining;
+
+ public LimitInputStream(InputStream in, long length) {
+ super(in);
+ this.remaining = length;
+ }
+
+ public boolean isOpen() {
+ return this.open.get();
+ }
+
+ public int read() throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("read() - stream is closed (remaining=" + this.remaining + ")");
+ } else if (this.remaining > 0L) {
+ --this.remaining;
+ return super.read();
+ } else {
+ return -1;
+ }
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("read(len=" + len + ") stream is closed (remaining=" + this.remaining + ")");
+ } else {
+ int nb = len;
+ if ((long)len > this.remaining) {
+ nb = (int)this.remaining;
+ }
+
+ if (nb > 0) {
+ int read = super.read(b, off, nb);
+ this.remaining -= (long)read;
+ return read;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ public long skip(long n) throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("skip(" + n + ") stream is closed (remaining=" + this.remaining + ")");
+ } else {
+ long skipped = super.skip(n);
+ this.remaining -= skipped;
+ return skipped;
+ }
+ }
+
+ public int available() throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("available() stream is closed (remaining=" + this.remaining + ")");
+ } else {
+ int av = super.available();
+ return (long)av > this.remaining ? (int)this.remaining : av;
+ }
+ }
+
+ public void close() throws IOException {
+ if (!this.open.getAndSet(false)) {
+ ;
+ }
+ }
+}
+
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
new file mode 100644
index 0000000..7856a3a
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPIncomingConnector implements IncomingConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(SCPIncomingConnector.class);
+
+ private Session session;
+ private GenericResource resource;
+ private Channel channel;
+ private OutputStream out;
+ private InputStream in;
+ private final byte[] buf = new byte[1024];
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ SCPSecret scpSecret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+ }
+
+ SCPStorage scpStorage = resource.getScpStorage();
+ logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+ this.session = SCPTransportUtil.createSession(
+ scpSecret.getUser(),
+ scpStorage.getHost(),
+ scpStorage.getPort(),
+ scpSecret.getPrivateKey().getBytes(),
+ scpSecret.getPublicKey().getBytes(),
+ scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+ if (session == null) {
+ logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ }
+ }
+
+ @Override
+ public InputStream fetchInputStream() throws Exception {
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ resourcePath = resource.getFile().getResourcePath();
+ break;
+ case DIRECTORY:
+ throw new Exception("A directory path can not be streamed");
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchInputStreamJCraft(resourcePath);
+ }
+
+ @Override
+ public InputStream fetchInputStream(String childPath) throws Exception {
+
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ throw new Exception("A child path can not be associated with a file parent");
+ case DIRECTORY:
+ resourcePath = resource.getDirectory().getResourcePath();
+ if (!childPath.startsWith(resourcePath)) {
+ throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+ }
+ resourcePath = childPath;
+ break;
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchInputStreamJCraft(resourcePath);
+ }
+
+ private InputStream fetchInputStreamJCraft(String resourcePath) throws Exception{
+ String command = "scp -f " + resourcePath;
+ channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ out = channel.getOutputStream();
+ in = channel.getInputStream();
+
+ channel.connect();
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ while (true) {
+ int c = checkAck(in);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ in.read(buf, 0, 5);
+
+ long filesize = 0L;
+ while (true) {
+ if (in.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ filesize = filesize * 10L + (long) (buf[0] - '0');
+ }
+
+ String file = null;
+ for (int i = 0; ; i++) {
+ in.read(buf, i, 1);
+ if (buf[i] == (byte) 0x0a) {
+ file = new String(buf, 0, i);
+ break;
+ }
+ }
+
+ logger.info("file-size=" + filesize + ", file=" + file);
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ // read a content of lfile
+ return new LimitInputStream(in, filesize);
+ }
+ return null;
+ }
+
+ @Override
+ public void complete() throws Exception {
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ channel.disconnect();
+ session.disconnect();
+ }
+
+ private int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ // b may be 0 for success,
+ // 1 for error,
+ // 2 for fatal error,
+ // -1
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ if (b == 1) { // error
+ logger.error("Check Ack Failure b = 1 " + sb.toString());
+ }
+ if (b == 2) { // fatal error
+ logger.error("Check Ack Failure b = 2 " + sb.toString());
+ }
+ }
+ return b;
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index a6c11c0..882f8d2 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -212,7 +212,7 @@ public class SCPMetadataCollector implements MetadataCollector {
}
if (rri.isRegularFile()) {
- FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.getBuilder()
+ FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.newBuilder()
.withFriendlyName(rri.getName())
.withResourcePath(rri.getPath())
.withCreatedTime(rri.getAttributes().getAtime())
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
new file mode 100644
index 0000000..4df4649
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPOutgoingConnector implements OutgoingConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(SCPOutgoingConnector.class);
+
+ private GenericResource resource;
+ private Session session;
+ private OutputStream out;
+ private InputStream in;
+ private Channel channel;
+ private ConnectorConfig cc;
+ private final byte[] buf = new byte[1024];
+
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+
+ this.cc = cc;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ SCPSecret scpSecret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+ }
+
+ SCPStorage scpStorage = resource.getScpStorage();
+ logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+ this.session = SCPTransportUtil.createSession(
+ scpSecret.getUser(),
+ scpStorage.getHost(),
+ scpStorage.getPort(),
+ scpSecret.getPrivateKey().getBytes(),
+ scpSecret.getPublicKey().getBytes(),
+ scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+ if (session == null) {
+ logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ }
+ }
+
+ @Override
+ public OutputStream fetchOutputStream() throws Exception {
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ resourcePath = resource.getFile().getResourcePath();
+ break;
+ case DIRECTORY:
+ throw new Exception("A directory path can not be streamed");
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchOutputStreamJCraft(resourcePath, cc.getMetadata().getResourceSize());
+ }
+
+ @Override
+ public OutputStream fetchOutputStream(String childPath) throws Exception {
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ throw new Exception("A child path can not be associated with a file parent");
+ case DIRECTORY:
+ resourcePath = resource.getDirectory().getResourcePath();
+ if (!childPath.startsWith(resourcePath)) {
+ throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+ }
+ resourcePath = childPath;
+ break;
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchOutputStreamJCraft(resourcePath, cc.getMetadata().getResourceSize());
+ }
+
+ public OutputStream fetchOutputStreamJCraft(String resourcePath, long fileSize) throws Exception {
+ boolean ptimestamp = true;
+
+ // exec 'scp -t rfile' remotely
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + resourcePath;
+ channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ out = channel.getOutputStream();
+ in = channel.getInputStream();
+
+ channel.connect();
+
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+
+ if (ptimestamp) {
+ command = "T" + (System.currentTimeMillis() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (System.currentTimeMillis() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+ }
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ command = "C0644 " + fileSize + " ";
+ if (resourcePath.lastIndexOf('/') > 0) {
+ command += resourcePath.substring(resourcePath.lastIndexOf('/') + 1);
+ } else {
+ command += resourcePath;
+ }
+
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+
+ return out;
+ }
+
+ @Override
+ public void complete() throws Exception {
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+ out.close();
+ channel.disconnect();
+ session.disconnect();
+ }
+
+ public int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ // b may be 0 for success,
+ // 1 for error,
+ // 2 for fatal error,
+ // -1
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ if (b == 1) { // error
+ logger.error("Check Ack Failure b = 1 " + sb.toString());
+ }
+ if (b == 2) { // fatal error
+ logger.error("Check Ack Failure b = 2 " + sb.toString());
+ }
+ }
+ return b;
+ }
+}