You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/08/22 16:42:20 UTC
[airavata-mft] 14/22: Improve code
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 7a951d285465238ff1093131714e3361ef54bd55
Author: isururanawaka <ir...@gmail.com>
AuthorDate: Mon Aug 19 11:00:52 2019 +0530
Improve code
---
.../airavata/mft/core/api/CompletionCallback.java | 1 +
.../apache/airavata/mft/core/api/Connector.java | 15 ++------
.../org/apache/airavata/mft/core/api/Mediator.java | 1 +
.../airavata/mft/core/api/SinkConnector.java | 9 +----
.../airavata/mft/core/api/SourceConnector.java | 12 ++----
.../mft/core/bufferedImpl/ConnectorConfig.java | 39 -------------------
.../airavata/mft/core/bufferedImpl/Constants.java | 8 ++--
.../{ => channel}/AbstractConnector.java | 3 +-
.../bufferedImpl/{ => channel}/ChannelUtils.java | 11 ++++--
.../core/bufferedImpl/{ => channel}/InChannel.java | 8 +---
.../bufferedImpl/{ => channel}/OutChannel.java | 6 +--
.../{ => mediation}/PassthroughMediator.java | 5 ++-
.../org/apache/airavata/mft/transport/s3/Main.java | 44 ++++++++++------------
.../airavata/mft/transport/s3/S3Constants.java | 18 ++++-----
.../airavata/mft/transport/s3/S3SinkConnector.java | 21 +++++------
.../mft/transport/s3/S3SourceConnector.java | 23 ++++++-----
16 files changed, 83 insertions(+), 141 deletions(-)
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java b/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java
index 657b017..f80c398 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java
@@ -32,6 +32,7 @@ public interface CompletionCallback {
/**
* Implementation of this should contain the logic
* for execution after mediation returns
+ *
* @param message
* @param error
*/
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
index 8627403..191ccc0 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
@@ -19,32 +19,25 @@
package org.apache.airavata.mft.core.api;
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
-
-import java.util.Properties;
/**
* This represents a connection between external source or sink
*/
public interface Connector {
- /**
- * Initiates the connector object
- * @param connectorConfig
- * @return initation state whether success or not
- */
- boolean initiate(ConnectorConfig connectorConfig);
/**
* This returns a {@link ConnectorChannel}
+ *
* @return Channel
*/
- ConnectorChannel openChannel(Properties properties) throws Exception;
+ ConnectorChannel openChannel() throws Exception;
/**
* This is used to close the channel and release resources related to channel
+ *
* @param channel
* @throws Exception
*/
- void closeChannel(ConnectorChannel channel) throws Exception;
+ void closeChannel(ConnectorChannel channel) throws Exception;
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java b/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java
index 76e1f69..92c479a 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java
@@ -29,6 +29,7 @@ public interface Mediator {
/**
* Mediates the content from source connector and writes the mediated content
* to destination connector
+ *
* @param src
* @param dst
* @param callback
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
index dd02e10..da5137f 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
@@ -19,22 +19,17 @@
package org.apache.airavata.mft.core.api;
-import java.util.Properties;
/**
* This represents the output connector, where to write
* data from the application.
*/
-public interface SinkConnector extends Connector{
+public interface SinkConnector extends Connector {
- /**
- * provides the channel to write data to external location
- * @return ConnectorChannel
- */
- ConnectorChannel openChannel(Properties properties) throws Exception;
/**
* Verify content upload of the given channel is completed
+ *
* @param channel
* @return true if succes else false
*/
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java
index a6b1ac5..db9922e 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java
@@ -19,19 +19,13 @@
package org.apache.airavata.mft.core.api;
-import java.util.Properties;
-
/**
* This represents the input connector, where to read data
- * from the application
+ * from the application. Use this connector to specify reader related
+ * methods
*/
-public interface SourceConnector extends Connector{
+public interface SourceConnector extends Connector {
- /**
- * provides the channel to read data from external location
- * @return ConnectorChannel
- */
- ConnectorChannel openChannel(Properties properties);
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ConnectorConfig.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ConnectorConfig.java
deleted file mode 100644
index 240cd2a..0000000
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ConnectorConfig.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.airavata.mft.core.bufferedImpl;
-
-
-import java.util.Properties;
-
-/**
- * This class wraps all the content related to connector
- */
-public class ConnectorConfig {
-
- private Properties properties;
-
- public ConnectorConfig(Properties properties) {
- this.properties = properties;
- }
-
- public String getValue(String key) {
- return properties.getProperty(key);
- }
-}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java
index 1f5b46c..93ee219 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java
@@ -22,9 +22,9 @@ package org.apache.airavata.mft.core.bufferedImpl;
/**
* Contains all the constants used in core classes.
*/
-public interface Constants {
+public class Constants {
- long TRANSFER_MAX_SIZE = (1024 * 1024);
- int BUFFER_SIZE = 8 * 1024;
- String CONNECTOR = "CONNECTOR";
+ public static long TRANSFER_MAX_SIZE = (1024 * 1024);
+ public static int BUFFER_SIZE = 8 * 1024;
+ public static String CONNECTOR = "CONNECTOR";
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/AbstractConnector.java
similarity index 92%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/AbstractConnector.java
index 8849aac..de119a5 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/AbstractConnector.java
@@ -17,12 +17,13 @@
* under the License.
*/
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.ConnectorChannel;
import org.apache.airavata.mft.core.api.SinkConnector;
import org.apache.airavata.mft.core.api.SourceConnector;
+import org.apache.airavata.mft.core.bufferedImpl.ConnectorException;
import java.io.IOException;
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ChannelUtils.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/ChannelUtils.java
similarity index 90%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ChannelUtils.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/ChannelUtils.java
index 1172142..7a402c6 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ChannelUtils.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/ChannelUtils.java
@@ -17,7 +17,9 @@
* under the License.
*/
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
+
+import org.apache.airavata.mft.core.bufferedImpl.Constants;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -32,6 +34,7 @@ public class ChannelUtils {
/**
* Tranfer data from readable byte channel to FileChannel using zero-copy
+ *
* @param byteChannel
* @param to
* @throws IOException
@@ -46,6 +49,7 @@ public class ChannelUtils {
/**
* Transfer data from FileChannel to writable byte channel using zero-copy
+ *
* @param to
* @param from
* @throws IOException
@@ -60,14 +64,15 @@ public class ChannelUtils {
/**
* Copy data from readable byte channel to writable byte channel
+ *
* @param src
* @param dest
* @throws IOException
*/
- public static void copyData(ReadableByteChannel src, WritableByteChannel dest) throws IOException {
+ public static void copyData(ReadableByteChannel src, WritableByteChannel dest) throws IOException {
final ByteBuffer buffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
int count = 0;
- while ((count =src.read(buffer)) != -1) {
+ while ((count = src.read(buffer)) != -1) {
// prepare the buffer to be drained
buffer.flip();
// write to the channel, may block
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/InChannel.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/InChannel.java
similarity index 91%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/InChannel.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/InChannel.java
index 542f283..5e2ab4c 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/InChannel.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/InChannel.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
import org.apache.airavata.mft.core.api.ConnectorChannel;
@@ -45,10 +45,6 @@ public class InChannel implements ConnectorChannel {
}
- public InputStream getInputStream() {
- return inputStream;
- }
-
@Override
public Channel getChannel() {
@@ -62,7 +58,7 @@ public class InChannel implements ConnectorChannel {
@Override
public void addChannelAttribute(String key, Object value) {
- contextAttributeMap.put(key,value );
+ contextAttributeMap.put(key, value);
}
@Override
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/OutChannel.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/OutChannel.java
similarity index 93%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/OutChannel.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/OutChannel.java
index 5d4e3c8..1df09c3 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/OutChannel.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/OutChannel.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
import org.apache.airavata.mft.core.api.ConnectorChannel;
@@ -52,7 +52,7 @@ public class OutChannel implements ConnectorChannel {
@Override
public void closeChannel() throws IOException {
- outputStream.close();
+ outputStream.close();
}
@Override
@@ -61,7 +61,7 @@ public class OutChannel implements ConnectorChannel {
}
@Override
- public Object getChannelAttribute(String key){
+ public Object getChannelAttribute(String key) {
return contextAttributeMap.get(key);
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/mediation/PassthroughMediator.java
similarity index 93%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/mediation/PassthroughMediator.java
index af9b663..f6e2044 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/mediation/PassthroughMediator.java
@@ -17,9 +17,12 @@
* under the License.
*/
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.mediation;
import org.apache.airavata.mft.core.api.*;
+import org.apache.airavata.mft.core.bufferedImpl.ConnectorException;
+import org.apache.airavata.mft.core.bufferedImpl.Constants;
+import org.apache.airavata.mft.core.bufferedImpl.channel.ChannelUtils;
import java.io.IOException;
import java.nio.channels.FileChannel;
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
index af29276..76c5ae5 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
@@ -20,11 +20,8 @@
package org.apache.airavata.mft.transport.s3;
import org.apache.airavata.mft.core.api.ConnectorChannel;
-import org.apache.airavata.mft.core.bufferedImpl.AbstractConnector;
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
-import org.apache.airavata.mft.core.bufferedImpl.PassthroughMediator;
-
-import java.util.Properties;
+import org.apache.airavata.mft.core.bufferedImpl.channel.AbstractConnector;
+import org.apache.airavata.mft.core.bufferedImpl.mediation.PassthroughMediator;
/**
* For Testing
@@ -32,7 +29,6 @@ import java.util.Properties;
public class Main {
-
public static void main(String[] args) {
Main main = new Main();
@@ -41,30 +37,31 @@ public class Main {
}
public void execute() {
- Properties connectorConfProp = new Properties();
- connectorConfProp.put(S3Constants.ACCESS_KEY, "XXXX");
- connectorConfProp.put(S3Constants.SECRET_KEY, "YYYY");
- connectorConfProp.put(S3Constants.REGION, "us-east-2");
- Properties srcProp = new Properties();
- srcProp.put(S3Constants.BUCKET, "test");
- srcProp.put(S3Constants.REMOTE_FILE, "test.pdf");
+ S3ResourceIdentifier src = new S3ResourceIdentifier();
+ S3ResourceIdentifier dst = new S3ResourceIdentifier();
+
+ src.setAccessKey("XXX");
+ src.setSecretKey("YYY");
+ src.setRegion("us-east-2");
+ src.setBucket("test");
+ src.setRemoteFile("test.pdf");
+
+ dst.setAccessKey("XXX");
+ dst.setSecretKey("YYY");
+ dst.setRegion("us-east-2");
+ dst.setBucket("test");
+ dst.setRemoteFile("testA.pdf");
- Properties dstProp = new Properties();
- dstProp.put(S3Constants.BUCKET, "test");
- dstProp.put(S3Constants.REMOTE_FILE, "teemure.pdf");
try {
- ConnectorConfig connectorConfig = new ConnectorConfig(connectorConfProp);
- AbstractConnector s3SourceConnector = new S3SourceConnector();
- s3SourceConnector.initiate(connectorConfig);
- ConnectorChannel srcChannel = s3SourceConnector.openChannel(srcProp);
+ AbstractConnector s3SourceConnector = new S3SourceConnector(src);
+ ConnectorChannel srcChannel = s3SourceConnector.openChannel();
- AbstractConnector s3SinkConnector = new S3SinkConnector();
- s3SinkConnector.initiate(connectorConfig);
- ConnectorChannel dstChannel = s3SinkConnector.openChannel(dstProp);
+ AbstractConnector s3SinkConnector = new S3SinkConnector(dst);
+ ConnectorChannel dstChannel = s3SinkConnector.openChannel();
PassthroughMediator passthroughMediator = new PassthroughMediator();
@@ -80,7 +77,6 @@ public class Main {
// };
-
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java
index 8058ca0..9103319 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java
@@ -22,14 +22,14 @@ package org.apache.airavata.mft.transport.s3;
/**
* Includes constants related to S3 SDK
*/
-public interface S3Constants {
+public class S3Constants {
- String ACCESS_KEY="ACCESS_KEY";
- String SECRET_KEY="SECRET_KEY";
- String BUCKET="BUCKET";
- String REGION="REGION";
- String REMOTE_FILE="REMOTE_FILE";
- int CONNECTION_EXPIRE_TIME = 1000 * 60 * 60;
- String HTTP_CONNECTION = "HTTP_CONNECTION";
- int HTTP_SUCCESS_RESPONSE_CODE = 200;
+ public static String ACCESS_KEY = "ACCESS_KEY";
+ public static String SECRET_KEY = "SECRET_KEY";
+ public static String BUCKET = "BUCKET";
+ public static String REGION = "REGION";
+ public static String REMOTE_FILE = "REMOTE_FILE";
+ public static int CONNECTION_EXPIRE_TIME = 1000 * 60 * 60;
+ public static String HTTP_CONNECTION = "HTTP_CONNECTION";
+ public static int HTTP_SUCCESS_RESPONSE_CODE = 200;
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
index 3ee2bae..2434332 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
@@ -24,16 +24,14 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
import org.apache.airavata.mft.core.api.ConnectorChannel;
import org.apache.airavata.mft.core.api.SinkConnector;
-import org.apache.airavata.mft.core.bufferedImpl.AbstractConnector;
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
import org.apache.airavata.mft.core.bufferedImpl.Constants;
-import org.apache.airavata.mft.core.bufferedImpl.OutChannel;
+import org.apache.airavata.mft.core.bufferedImpl.channel.AbstractConnector;
+import org.apache.airavata.mft.core.bufferedImpl.channel.OutChannel;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.util.Properties;
/**
* Connector class which connects to a given S3 destination and provides
@@ -41,24 +39,23 @@ import java.util.Properties;
*/
public class S3SinkConnector extends AbstractConnector implements SinkConnector {
private AmazonS3 s3Client;
+ private S3ResourceIdentifier s3ResourceIdentifier;
-
- @Override
- public boolean initiate(ConnectorConfig connectorConfig) {
- s3Client = S3TransportUtil.getS3Client(connectorConfig.getValue(S3Constants.ACCESS_KEY),
- connectorConfig.getValue(S3Constants.SECRET_KEY), connectorConfig.getValue(S3Constants.REGION));
- return true;
+ public S3SinkConnector(S3ResourceIdentifier s3ResourceIdentifier) {
+ this.s3ResourceIdentifier = s3ResourceIdentifier;
+ this.s3Client = S3TransportUtil.getS3Client(s3ResourceIdentifier.getAccessKey(),
+ s3ResourceIdentifier.getSecretKey(), s3ResourceIdentifier.getRegion());
}
@Override
- public ConnectorChannel openChannel(Properties properties) throws IOException {
+ public ConnectorChannel openChannel() throws IOException {
java.util.Date expiration = new java.util.Date();
long expTimeMillis = expiration.getTime();
expTimeMillis += S3Constants.CONNECTION_EXPIRE_TIME;
expiration.setTime(expTimeMillis);
GeneratePresignedUrlRequest generatePresignedUrlRequest = new GeneratePresignedUrlRequest
- (properties.getProperty(S3Constants.BUCKET), properties.getProperty(S3Constants.REMOTE_FILE))
+ (s3ResourceIdentifier.getBucket(), s3ResourceIdentifier.getRemoteFile())
.withMethod(HttpMethod.PUT)
.withExpiration(expiration);
URL url = s3Client.generatePresignedUrl(generatePresignedUrlRequest);
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
index 6539d04..8f486be 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
@@ -23,13 +23,11 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3Object;
import org.apache.airavata.mft.core.api.ConnectorChannel;
import org.apache.airavata.mft.core.api.SourceConnector;
-import org.apache.airavata.mft.core.bufferedImpl.AbstractConnector;
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
import org.apache.airavata.mft.core.bufferedImpl.Constants;
-import org.apache.airavata.mft.core.bufferedImpl.InChannel;
+import org.apache.airavata.mft.core.bufferedImpl.channel.AbstractConnector;
+import org.apache.airavata.mft.core.bufferedImpl.channel.InChannel;
import java.io.InputStream;
-import java.util.Properties;
/**
* Connector class which connects to a given S3 source and provides
@@ -39,17 +37,18 @@ public class S3SourceConnector extends AbstractConnector implements SourceConnec
private AmazonS3 s3Client;
- @Override
- public boolean initiate(ConnectorConfig connectorConfig) {
- s3Client = S3TransportUtil.getS3Client(connectorConfig.getValue(S3Constants.ACCESS_KEY),
- connectorConfig.getValue(S3Constants.SECRET_KEY), connectorConfig.getValue(S3Constants.REGION));
- return true;
+ private S3ResourceIdentifier identifier;
+
+ public S3SourceConnector(S3ResourceIdentifier identifier) {
+ this.identifier = identifier;
+ this.s3Client = S3TransportUtil.getS3Client(identifier.getAccessKey(),
+ identifier.getSecretKey(), identifier.getAccessKey());
}
@Override
- public ConnectorChannel openChannel(Properties properties) {
- S3Object s3object = s3Client.getObject(properties.getProperty(S3Constants.BUCKET),
- properties.getProperty(S3Constants.REMOTE_FILE));
+ public ConnectorChannel openChannel() {
+ S3Object s3object = s3Client.getObject(identifier.getBucket(),
+ identifier.getRemoteFile());
InputStream inputStream;
if (s3object != null && s3object.getObjectContent() != null) {
inputStream = s3object.getObjectContent();