You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/08/22 16:42:20 UTC

[airavata-mft] 14/22: Improve code

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit 7a951d285465238ff1093131714e3361ef54bd55
Author: isururanawaka <ir...@gmail.com>
AuthorDate: Mon Aug 19 11:00:52 2019 +0530

    Improve code
---
 .../airavata/mft/core/api/CompletionCallback.java  |  1 +
 .../apache/airavata/mft/core/api/Connector.java    | 15 ++------
 .../org/apache/airavata/mft/core/api/Mediator.java |  1 +
 .../airavata/mft/core/api/SinkConnector.java       |  9 +----
 .../airavata/mft/core/api/SourceConnector.java     | 12 ++----
 .../mft/core/bufferedImpl/ConnectorConfig.java     | 39 -------------------
 .../airavata/mft/core/bufferedImpl/Constants.java  |  8 ++--
 .../{ => channel}/AbstractConnector.java           |  3 +-
 .../bufferedImpl/{ => channel}/ChannelUtils.java   | 11 ++++--
 .../core/bufferedImpl/{ => channel}/InChannel.java |  8 +---
 .../bufferedImpl/{ => channel}/OutChannel.java     |  6 +--
 .../{ => mediation}/PassthroughMediator.java       |  5 ++-
 .../org/apache/airavata/mft/transport/s3/Main.java | 44 ++++++++++------------
 .../airavata/mft/transport/s3/S3Constants.java     | 18 ++++-----
 .../airavata/mft/transport/s3/S3SinkConnector.java | 21 +++++------
 .../mft/transport/s3/S3SourceConnector.java        | 23 ++++++-----
 16 files changed, 83 insertions(+), 141 deletions(-)

diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java b/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java
index 657b017..f80c398 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/CompletionCallback.java
@@ -32,6 +32,7 @@ public interface CompletionCallback {
     /**
      * Implementation of this should contain the logic
      * for execution after mediation returns
+     *
      * @param message
      * @param error
      */
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
index 8627403..191ccc0 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
@@ -19,32 +19,25 @@
 
 package org.apache.airavata.mft.core.api;
 
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
-
-import java.util.Properties;
 
 /**
  * This represents a connection between external source or sink
  */
 public interface Connector {
 
-    /**
-     * Initiates the connector object
-     * @param connectorConfig
-     * @return initation state whether success or not
-     */
-    boolean initiate(ConnectorConfig connectorConfig);
 
     /**
      * This returns a {@link ConnectorChannel}
+     *
      * @return Channel
      */
-    ConnectorChannel openChannel(Properties properties) throws Exception;
+    ConnectorChannel openChannel() throws Exception;
 
     /**
      * This is used to close the channel and release resources related to channel
+     *
      * @param channel
      * @throws Exception
      */
-    void closeChannel(ConnectorChannel channel) throws  Exception;
+    void closeChannel(ConnectorChannel channel) throws Exception;
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java b/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java
index 76e1f69..92c479a 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Mediator.java
@@ -29,6 +29,7 @@ public interface Mediator {
     /**
      * Mediates the content from source connector and writes the mediated content
      * to destination connector
+     *
      * @param src
      * @param dst
      * @param callback
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
index dd02e10..da5137f 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
@@ -19,22 +19,17 @@
 
 package org.apache.airavata.mft.core.api;
 
-import java.util.Properties;
 
 /**
  * This represents the output connector, where to write
  * data from the application.
  */
-public interface SinkConnector extends Connector{
+public interface SinkConnector extends Connector {
 
-    /**
-     * provides the channel to write data to external location
-     * @return ConnectorChannel
-     */
-     ConnectorChannel openChannel(Properties properties) throws Exception;
 
     /**
      * Verify content upload of the given channel is completed
+     *
      * @param channel
      * @return true if succes else false
      */
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java
index a6b1ac5..db9922e 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/SourceConnector.java
@@ -19,19 +19,13 @@
 
 package org.apache.airavata.mft.core.api;
 
-import java.util.Properties;
-
 
 /**
  * This represents the input connector, where to read data
- * from the application
+ * from the application. Use this connector to specify reader related
+ * methods
  */
-public interface SourceConnector extends Connector{
+public interface SourceConnector extends Connector {
 
-    /**
-     * provides the channel to read data from external location
-     * @return ConnectorChannel
-     */
-    ConnectorChannel openChannel(Properties properties);
 
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ConnectorConfig.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ConnectorConfig.java
deleted file mode 100644
index 240cd2a..0000000
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ConnectorConfig.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *   KIND, either express or implied.  See the License for the
- *   specific language governing permissions and limitations
- *   under the License.
- */
-
-package org.apache.airavata.mft.core.bufferedImpl;
-
-
-import java.util.Properties;
-
-/**
- * This class wraps all the content related to connector
- */
-public  class ConnectorConfig {
-
-    private Properties properties;
-
-    public ConnectorConfig(Properties properties) {
-        this.properties = properties;
-    }
-
-    public String getValue(String key) {
-        return properties.getProperty(key);
-    }
-}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java
index 1f5b46c..93ee219 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/Constants.java
@@ -22,9 +22,9 @@ package org.apache.airavata.mft.core.bufferedImpl;
 /**
  * Contains all the  constants used in core classes.
  */
-public interface Constants {
+public class Constants {
 
-    long TRANSFER_MAX_SIZE = (1024 * 1024);
-    int BUFFER_SIZE = 8 * 1024;
-    String CONNECTOR = "CONNECTOR";
+    public static long TRANSFER_MAX_SIZE = (1024 * 1024);
+    public static int BUFFER_SIZE = 8 * 1024;
+    public static String CONNECTOR = "CONNECTOR";
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/AbstractConnector.java
similarity index 92%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/AbstractConnector.java
index 8849aac..de119a5 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/AbstractConnector.java
@@ -17,12 +17,13 @@
  *   under the License.
  */
 
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
 
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.core.api.ConnectorChannel;
 import org.apache.airavata.mft.core.api.SinkConnector;
 import org.apache.airavata.mft.core.api.SourceConnector;
+import org.apache.airavata.mft.core.bufferedImpl.ConnectorException;
 
 import java.io.IOException;
 
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ChannelUtils.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/ChannelUtils.java
similarity index 90%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ChannelUtils.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/ChannelUtils.java
index 1172142..7a402c6 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/ChannelUtils.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/ChannelUtils.java
@@ -17,7 +17,9 @@
  *   under the License.
  */
 
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
+
+import org.apache.airavata.mft.core.bufferedImpl.Constants;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -32,6 +34,7 @@ public class ChannelUtils {
 
     /**
      * Tranfer data from readable byte channel to FileChannel using zero-copy
+     *
      * @param byteChannel
      * @param to
      * @throws IOException
@@ -46,6 +49,7 @@ public class ChannelUtils {
 
     /**
      * Transfer data from FileChannel to writable byte channel using zero-copy
+     *
      * @param to
      * @param from
      * @throws IOException
@@ -60,14 +64,15 @@ public class ChannelUtils {
 
     /**
      * Copy data from readable byte channel to writable byte channel
+     *
      * @param src
      * @param dest
      * @throws IOException
      */
-    public static void copyData(ReadableByteChannel src,  WritableByteChannel dest) throws IOException {
+    public static void copyData(ReadableByteChannel src, WritableByteChannel dest) throws IOException {
         final ByteBuffer buffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
         int count = 0;
-        while ((count =src.read(buffer)) != -1) {
+        while ((count = src.read(buffer)) != -1) {
             // prepare the buffer to be drained
             buffer.flip();
             // write to the channel, may block
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/InChannel.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/InChannel.java
similarity index 91%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/InChannel.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/InChannel.java
index 542f283..5e2ab4c 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/InChannel.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/InChannel.java
@@ -17,7 +17,7 @@
  *   under the License.
  */
 
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
 
 import org.apache.airavata.mft.core.api.ConnectorChannel;
 
@@ -45,10 +45,6 @@ public class InChannel implements ConnectorChannel {
 
     }
 
-    public InputStream getInputStream() {
-        return inputStream;
-    }
-
 
     @Override
     public Channel getChannel() {
@@ -62,7 +58,7 @@ public class InChannel implements ConnectorChannel {
 
     @Override
     public void addChannelAttribute(String key, Object value) {
-        contextAttributeMap.put(key,value );
+        contextAttributeMap.put(key, value);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/OutChannel.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/OutChannel.java
similarity index 93%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/OutChannel.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/OutChannel.java
index 5d4e3c8..1df09c3 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/OutChannel.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/channel/OutChannel.java
@@ -17,7 +17,7 @@
  *   under the License.
  */
 
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.channel;
 
 import org.apache.airavata.mft.core.api.ConnectorChannel;
 
@@ -52,7 +52,7 @@ public class OutChannel implements ConnectorChannel {
 
     @Override
     public void closeChannel() throws IOException {
-            outputStream.close();
+        outputStream.close();
     }
 
     @Override
@@ -61,7 +61,7 @@ public class OutChannel implements ConnectorChannel {
     }
 
     @Override
-    public Object getChannelAttribute(String key){
+    public Object getChannelAttribute(String key) {
         return contextAttributeMap.get(key);
     }
 
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/mediation/PassthroughMediator.java
similarity index 93%
rename from core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java
rename to core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/mediation/PassthroughMediator.java
index af9b663..f6e2044 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/mediation/PassthroughMediator.java
@@ -17,9 +17,12 @@
  *   under the License.
  */
 
-package org.apache.airavata.mft.core.bufferedImpl;
+package org.apache.airavata.mft.core.bufferedImpl.mediation;
 
 import org.apache.airavata.mft.core.api.*;
+import org.apache.airavata.mft.core.bufferedImpl.ConnectorException;
+import org.apache.airavata.mft.core.bufferedImpl.Constants;
+import org.apache.airavata.mft.core.bufferedImpl.channel.ChannelUtils;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
index af29276..76c5ae5 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
@@ -20,11 +20,8 @@
 package org.apache.airavata.mft.transport.s3;
 
 import org.apache.airavata.mft.core.api.ConnectorChannel;
-import org.apache.airavata.mft.core.bufferedImpl.AbstractConnector;
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
-import org.apache.airavata.mft.core.bufferedImpl.PassthroughMediator;
-
-import java.util.Properties;
+import org.apache.airavata.mft.core.bufferedImpl.channel.AbstractConnector;
+import org.apache.airavata.mft.core.bufferedImpl.mediation.PassthroughMediator;
 
 /**
  * For Testing
@@ -32,7 +29,6 @@ import java.util.Properties;
 public class Main {
 
 
-
     public static void main(String[] args) {
 
         Main main = new Main();
@@ -41,30 +37,31 @@ public class Main {
     }
 
     public void execute() {
-        Properties connectorConfProp = new Properties();
-        connectorConfProp.put(S3Constants.ACCESS_KEY, "XXXX");
-        connectorConfProp.put(S3Constants.SECRET_KEY, "YYYY");
-        connectorConfProp.put(S3Constants.REGION, "us-east-2");
 
-        Properties srcProp = new Properties();
-        srcProp.put(S3Constants.BUCKET, "test");
-        srcProp.put(S3Constants.REMOTE_FILE, "test.pdf");
+        S3ResourceIdentifier src = new S3ResourceIdentifier();
+        S3ResourceIdentifier dst = new S3ResourceIdentifier();
+
+        src.setAccessKey("XXX");
+        src.setSecretKey("YYY");
+        src.setRegion("us-east-2");
+        src.setBucket("test");
+        src.setRemoteFile("test.pdf");
+
+        dst.setAccessKey("XXX");
+        dst.setSecretKey("YYY");
+        dst.setRegion("us-east-2");
+        dst.setBucket("test");
+        dst.setRemoteFile("testA.pdf");
 
-        Properties dstProp = new Properties();
-        dstProp.put(S3Constants.BUCKET, "test");
-        dstProp.put(S3Constants.REMOTE_FILE, "teemure.pdf");
 
         try {
 
-            ConnectorConfig connectorConfig = new ConnectorConfig(connectorConfProp);
 
-            AbstractConnector s3SourceConnector = new S3SourceConnector();
-            s3SourceConnector.initiate(connectorConfig);
-            ConnectorChannel srcChannel = s3SourceConnector.openChannel(srcProp);
+            AbstractConnector s3SourceConnector = new S3SourceConnector(src);
+            ConnectorChannel srcChannel = s3SourceConnector.openChannel();
 
-            AbstractConnector s3SinkConnector = new S3SinkConnector();
-            s3SinkConnector.initiate(connectorConfig);
-            ConnectorChannel dstChannel = s3SinkConnector.openChannel(dstProp);
+            AbstractConnector s3SinkConnector = new S3SinkConnector(dst);
+            ConnectorChannel dstChannel = s3SinkConnector.openChannel();
 
             PassthroughMediator passthroughMediator = new PassthroughMediator();
 
@@ -80,7 +77,6 @@ public class Main {
             //  };
 
 
-
         } catch (Exception e) {
             e.printStackTrace();
         }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java
index 8058ca0..9103319 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Constants.java
@@ -22,14 +22,14 @@ package org.apache.airavata.mft.transport.s3;
 /**
  * Includes constants related to S3 SDK
  */
-public interface S3Constants {
+public class S3Constants {
 
-    String ACCESS_KEY="ACCESS_KEY";
-    String SECRET_KEY="SECRET_KEY";
-    String BUCKET="BUCKET";
-    String REGION="REGION";
-    String REMOTE_FILE="REMOTE_FILE";
-    int CONNECTION_EXPIRE_TIME = 1000 * 60 * 60;
-    String HTTP_CONNECTION = "HTTP_CONNECTION";
-    int HTTP_SUCCESS_RESPONSE_CODE = 200;
+    public static String ACCESS_KEY = "ACCESS_KEY";
+    public static String SECRET_KEY = "SECRET_KEY";
+    public static String BUCKET = "BUCKET";
+    public static String REGION = "REGION";
+    public static String REMOTE_FILE = "REMOTE_FILE";
+    public static int CONNECTION_EXPIRE_TIME = 1000 * 60 * 60;
+    public static String HTTP_CONNECTION = "HTTP_CONNECTION";
+    public static int HTTP_SUCCESS_RESPONSE_CODE = 200;
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
index 3ee2bae..2434332 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
@@ -24,16 +24,14 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
 import org.apache.airavata.mft.core.api.ConnectorChannel;
 import org.apache.airavata.mft.core.api.SinkConnector;
-import org.apache.airavata.mft.core.bufferedImpl.AbstractConnector;
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
 import org.apache.airavata.mft.core.bufferedImpl.Constants;
-import org.apache.airavata.mft.core.bufferedImpl.OutChannel;
+import org.apache.airavata.mft.core.bufferedImpl.channel.AbstractConnector;
+import org.apache.airavata.mft.core.bufferedImpl.channel.OutChannel;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.util.Properties;
 
 /**
  * Connector class which connects to a given S3 destination and provides
@@ -41,24 +39,23 @@ import java.util.Properties;
  */
 public class S3SinkConnector extends AbstractConnector implements SinkConnector {
     private AmazonS3 s3Client;
+    private S3ResourceIdentifier s3ResourceIdentifier;
 
-
-    @Override
-    public boolean initiate(ConnectorConfig connectorConfig) {
-        s3Client = S3TransportUtil.getS3Client(connectorConfig.getValue(S3Constants.ACCESS_KEY),
-                connectorConfig.getValue(S3Constants.SECRET_KEY), connectorConfig.getValue(S3Constants.REGION));
-        return true;
+    public S3SinkConnector(S3ResourceIdentifier s3ResourceIdentifier) {
+        this.s3ResourceIdentifier = s3ResourceIdentifier;
+        this.s3Client = S3TransportUtil.getS3Client(s3ResourceIdentifier.getAccessKey(),
+                s3ResourceIdentifier.getSecretKey(), s3ResourceIdentifier.getRegion());
     }
 
     @Override
-    public ConnectorChannel openChannel(Properties properties) throws IOException {
+    public ConnectorChannel openChannel() throws IOException {
         java.util.Date expiration = new java.util.Date();
         long expTimeMillis = expiration.getTime();
         expTimeMillis += S3Constants.CONNECTION_EXPIRE_TIME;
         expiration.setTime(expTimeMillis);
 
         GeneratePresignedUrlRequest generatePresignedUrlRequest = new GeneratePresignedUrlRequest
-                (properties.getProperty(S3Constants.BUCKET), properties.getProperty(S3Constants.REMOTE_FILE))
+                (s3ResourceIdentifier.getBucket(), s3ResourceIdentifier.getRemoteFile())
                 .withMethod(HttpMethod.PUT)
                 .withExpiration(expiration);
         URL url = s3Client.generatePresignedUrl(generatePresignedUrlRequest);
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
index 6539d04..8f486be 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
@@ -23,13 +23,11 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.S3Object;
 import org.apache.airavata.mft.core.api.ConnectorChannel;
 import org.apache.airavata.mft.core.api.SourceConnector;
-import org.apache.airavata.mft.core.bufferedImpl.AbstractConnector;
-import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
 import org.apache.airavata.mft.core.bufferedImpl.Constants;
-import org.apache.airavata.mft.core.bufferedImpl.InChannel;
+import org.apache.airavata.mft.core.bufferedImpl.channel.AbstractConnector;
+import org.apache.airavata.mft.core.bufferedImpl.channel.InChannel;
 
 import java.io.InputStream;
-import java.util.Properties;
 
 /**
  * Connector class which connects to a given S3 source and provides
@@ -39,17 +37,18 @@ public class S3SourceConnector extends AbstractConnector implements SourceConnec
 
     private AmazonS3 s3Client;
 
-    @Override
-    public boolean initiate(ConnectorConfig connectorConfig) {
-        s3Client = S3TransportUtil.getS3Client(connectorConfig.getValue(S3Constants.ACCESS_KEY),
-                connectorConfig.getValue(S3Constants.SECRET_KEY), connectorConfig.getValue(S3Constants.REGION));
-        return true;
+    private S3ResourceIdentifier identifier;
+
+    public S3SourceConnector(S3ResourceIdentifier identifier) {
+        this.identifier = identifier;
+        this.s3Client = S3TransportUtil.getS3Client(identifier.getAccessKey(),
+                identifier.getSecretKey(), identifier.getAccessKey());
     }
 
     @Override
-    public ConnectorChannel openChannel(Properties properties) {
-        S3Object s3object = s3Client.getObject(properties.getProperty(S3Constants.BUCKET),
-                properties.getProperty(S3Constants.REMOTE_FILE));
+    public ConnectorChannel openChannel() {
+        S3Object s3object = s3Client.getObject(identifier.getBucket(),
+                identifier.getRemoteFile());
         InputStream inputStream;
         if (s3object != null && s3object.getObjectContent() != null) {
             inputStream = s3object.getObjectContent();