You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/07/05 10:59:27 UTC

[incubator-wayang] 03/03: [WAYANG-S3] support for s3 in apache wayang

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch s3
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f232a5ed35da32a549baaa6ea7db962b2ea1f483
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Tue Jul 5 12:58:38 2022 +0200

    [WAYANG-S3] support for s3 in apache wayang
    
    Signed-off-by: bertty <be...@apache.org>
---
 pom.xml                                            |   2 +-
 wayang-benchmark/pom.xml                           |   5 +
 wayang-commons/pom.xml                             |   8 +
 wayang-commons/wayang-core/pom.xml                 |  22 +-
 .../apache/wayang/core/util/fs/FileSystems.java    |   3 +-
 .../apache/wayang/core/util/fs/S3FileSystem.java   | 232 +++++++++++++++++++++
 .../wayang/spark/platform/SparkPlatform.java       |  17 +-
 wayang-platforms/wayang-spark/pom.xml              |  12 +-
 8 files changed, 296 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 36ab3ede..b506f3b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,7 @@
         <mockito.version>3.5.10</mockito.version>
         <mockk.version>1.10.0</mockk.version>
         <external.platforms.scope>provided</external.platforms.scope>
-        <hadoop.version>2.7.7</hadoop.version>
+        <hadoop.version>3.1.2</hadoop.version>
         <!-- To be overridden by individual modules -->
         <java-module-name>org.apache.wayang.default</java-module-name>
         <code.coverage.project.folder>${basedir}/</code.coverage.project.folder>
diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml
index f230ee09..7df47adf 100644
--- a/wayang-benchmark/pom.xml
+++ b/wayang-benchmark/pom.xml
@@ -59,6 +59,11 @@
       <artifactId>wayang-sqlite3</artifactId>
       <version>0.6.1-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>3.1.2</version>
+    </dependency>
   </dependencies>
 
   <modules>
diff --git a/wayang-commons/pom.xml b/wayang-commons/pom.xml
index fdb0e20f..29941082 100644
--- a/wayang-commons/pom.xml
+++ b/wayang-commons/pom.xml
@@ -81,6 +81,10 @@
                         <groupId>com.thoughtworks.paranamer</groupId>
                         <artifactId>paranamer</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>commons-httpclient</groupId>
+                        <artifactId>commons-httpclient</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
@@ -165,6 +169,10 @@
                         <groupId>com.thoughtworks.paranamer</groupId>
                         <artifactId>paranamer</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>commons-httpclient</groupId>
+                        <artifactId>commons-httpclient</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
diff --git a/wayang-commons/wayang-core/pom.xml b/wayang-commons/wayang-core/pom.xml
index 780c04ad..b170a49d 100644
--- a/wayang-commons/wayang-core/pom.xml
+++ b/wayang-commons/wayang-core/pom.xml
@@ -37,7 +37,17 @@
     <properties>
         <java-module-name>org.apache.wayang.core</java-module-name>
     </properties>
-
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.amazonaws</groupId>
+                <artifactId>aws-java-sdk-bom</artifactId>
+                <version>1.12.253</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
     <dependencies>
         <dependency>
             <groupId>org.yaml</groupId>
@@ -93,6 +103,16 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+            <version>1.12.253</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.13</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
index 21ae249e..087a64d2 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
@@ -45,7 +45,8 @@ public class FileSystems {
 
     private static Collection<FileSystem> registeredFileSystems = Arrays.asList(
             new LocalFileSystem(),
-            new HadoopFileSystem()
+            new HadoopFileSystem(),
+            new S3FileSystem()
     );
 
     private FileSystems() {
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
new file mode 100644
index 00000000..99fe3cb8
--- /dev/null
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
@@ -0,0 +1,232 @@
+package org.apache.wayang.core.util.fs;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class S3FileSystem implements FileSystem {
+
+  final AmazonS3 s3;
+
+  final Map<String, S3Pair> pairs = new HashMap<>();
+
+  public S3FileSystem(){
+
+    if(
+        System.getProperties().contains("fs.s3.awsAccessKeyId") &&
+        System.getProperties().contains("fs.s3.awsSecretAccessKey")
+    ){
+      BasicAWSCredentials awsCreds = new BasicAWSCredentials(
+                                              System.getProperty("fs.s3.awsAccessKeyId"),
+                                              System.getProperty("fs.s3.awsSecretAccessKey")
+      );
+      this.s3 = AmazonS3ClientBuilder.standard()
+          .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+          .build();
+    }else{
+      this.s3 = AmazonS3ClientBuilder.defaultClient();
+    }
+  }
+
+  public static void main(String... args) throws IOException {
+    S3FileSystem s3 = new S3FileSystem();
+    //String url = "s3://blossom-benchmark/HIGGS.csv";
+   // String url = "s3://blossom-benchmark/README.md";
+    String url = "s3://blossom-benchmark/lulu/lolo/lala";
+    System.out.println(url);
+    System.out.println(s3.getS3Pair(url).getBucket());
+    System.out.println(s3.getS3Pair(url).getKey());
+    System.out.println(s3.preFoldersExits(s3.getS3Pair(url)));
+
+   // System.out.println(s3.getFileSize(url));
+//    InputStream content = s3.open(url);
+//    new BufferedReader(new InputStreamReader(content)).lines().forEach(System.out::println);
+//    System.out.println(s3.listChildren(url));
+//    System.out.println(s3.isDirectory(url));
+    OutputStream output = s3.create(url, true);
+    byte[] bytes = "lala".getBytes();
+    output.write(bytes);
+    output.flush();
+    output.close();
+  }
+
+  class S3Pair{
+
+    private final String bucket;
+    private final String key;
+
+    public S3Pair(S3FileSystem s3Client, String url){
+      if( ! s3Client.canHandle(url)){
+        throw new WayangException("The files can not be handle by "+this.getClass().getSimpleName());
+      }
+      String[] parts = url.split("/", 4);
+      String key_tmp = "";
+      if(parts.length == 4) {
+        key_tmp = parts[3];
+      }
+      this.bucket = parts[2];
+      this.key = key_tmp;
+    }
+
+    public S3Pair(String bucket, String key){
+      this.bucket = bucket;
+      this.key = key;
+    }
+
+    public String getBucket() {
+      return bucket;
+    }
+
+    public String getKey() {
+      return key;
+    }
+  }
+
+  private S3Pair getS3Pair(String url){
+    S3Pair pair = this.pairs.get(url);
+    if(pair == null){
+      pair = new S3Pair(this, url);
+      this.pairs.put(url, pair);
+    }
+    return pair;
+  }
+
+  @Override
+  public long getFileSize(String fileUrl) throws FileNotFoundException {
+    return this.getFileSize(this.getS3Pair(fileUrl));
+  }
+
+  private long getFileSize(S3Pair pair) throws FileNotFoundException {
+    return this.s3.getObjectMetadata(pair.getBucket(), pair.getKey()).getContentLength();
+  }
+
+  @Override
+  public boolean canHandle(String url) {
+    String url_lower = url.substring(0, 5).toLowerCase();
+    return url_lower.startsWith("s3a:/");
+  }
+
+  @Override
+  public InputStream open(String url) throws IOException {
+    return this.open(this.getS3Pair(url));
+  }
+
+  private InputStream open(S3Pair pair) throws IOException {
+    return this.s3.getObject(pair.getBucket(), pair.getKey()).getObjectContent();
+  }
+
+  @Override
+  public OutputStream create(String url) throws IOException {
+    return this.create(this.getS3Pair(url));
+  }
+
+  private OutputStream create(S3Pair pair) throws IOException {
+    return this.create(pair, false);
+  }
+
+  @Override
+  public OutputStream create(String url, Boolean forceCreateParentDirs) throws IOException {
+    return this.create(this.getS3Pair(url), forceCreateParentDirs);
+  }
+
+  private OutputStream create(S3Pair pair, Boolean forceCreateParentDirs) throws IOException {
+    if( ! forceCreateParentDirs ){
+      if ( ! this.preFoldersExits(pair) )
+        throw new IOException(
+            String.format(
+              "The folder '%s' does not exist in the bucket '%s'",
+              pair.getKey(),
+              pair.getBucket()
+            )
+        );
+    }
+
+    PipedInputStream in = new PipedInputStream();
+    final PipedOutputStream out = new PipedOutputStream(in);
+
+    ObjectMetadata metadata = new ObjectMetadata();
+    metadata.setContentType("text/plain");
+    AmazonS3 s3Client = this.s3;
+    new Thread(new Runnable() {
+      public void run() {
+        PutObjectResult result = s3Client.putObject(pair.getBucket(), pair.getKey(), in, metadata);
+      }
+    }).start();
+    return out;
+  }
+
+  public boolean bucketExits(S3Pair pair){
+    return this.s3.doesBucketExistV2(pair.getBucket());
+  }
+
+  public boolean preFoldersExits(S3Pair pair){
+    if( ! this.s3.doesBucketExistV2(pair.getBucket()) ) return false;
+    String[] keys = pair.getKey().split("/");
+    String aggregated = "";
+    for(int i = 0; i < keys.length; i++){
+      aggregated = aggregated + "/" +keys[i];
+      if( ! isDirectory(new S3Pair(pair.getBucket(), aggregated)) ){
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isDirectory(String url) {
+    return this.isDirectory(this.getS3Pair(url));
+  }
+
+  private boolean isDirectory(S3Pair pair) {
+    if( ! this.bucketExits(pair) ) return false;
+
+    String key = pair.getKey();
+    long size = listChildren(pair).stream().filter(name -> ! name.equals(key)).count();
+    if(size > 0){
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Collection<String> listChildren(String url) {
+    return this.listChildren(this.getS3Pair(url));
+  }
+
+  private Collection<String> listChildren(S3Pair pair) {
+    ObjectListing listing = this.s3.listObjects(pair.getBucket(), pair.getKey());
+    return listing.getObjectSummaries().stream()
+        .map(obj -> obj.getKey())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean delete(String url, boolean isRecursiveDelete) throws IOException {
+    return this.delete(this.getS3Pair(url), isRecursiveDelete);
+  }
+
+  private boolean delete(S3Pair pair, boolean isRecursiveDelete) throws IOException {
+    if(!isRecursiveDelete){
+      if(isDirectory(pair)){
+        throw new IOException("the path correspond to a directory");
+      }
+    }
+    this.s3.deleteObject(pair.getBucket(), pair.getKey());
+    return true;
+  }
+}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index 9b574afc..471275ab 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -83,7 +83,12 @@ public class SparkPlatform extends Platform {
             "spark.io.compression.codec",
             "spark.driver.memory",
             "spark.executor.heartbeatInterval",
-            "spark.network.timeout"
+            "spark.network.timeout",
+    };
+
+    private static final String[] OPTIONAL_HADOOP_PROPERTIES = {
+        "fs.s3.awsAccessKeyId",
+        "fs.s3.awsSecretAccessKey"
     };
 
     /**
@@ -121,6 +126,7 @@ public class SparkPlatform extends Platform {
                     "There is already a SparkContext (master: {}): , which will be reused. " +
                             "Not all settings might be effective.", sparkContext.getConf().get("spark.master"));
             sparkConf = sparkContext.getConf();
+
         } else {
             sparkConf = new SparkConf(true);
         }
@@ -133,6 +139,7 @@ public class SparkPlatform extends Platform {
                     value -> sparkConf.set(property, value)
             );
         }
+
         if (job.getName() != null) {
             sparkConf.set("spark.app.name", job.getName());
         }
@@ -142,6 +149,14 @@ public class SparkPlatform extends Platform {
         }
         final JavaSparkContext sparkContext = this.sparkContextReference.get();
 
+        org.apache.hadoop.conf.Configuration hadoopconf = sparkContext.hadoopConfiguration();
+        for (String property: OPTIONAL_HADOOP_PROPERTIES){
+            System.out.println(property);
+            configuration.getOptionalStringProperty(property).ifPresent(
+                value -> hadoopconf.set(property, value)
+            );
+        }
+
         // Set up the JAR files.
         //sparkContext.clearJars();
         if (!sparkContext.isLocal()) {
diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml
index 32c3625f..2fcfbfc2 100644
--- a/wayang-platforms/wayang-spark/pom.xml
+++ b/wayang-platforms/wayang-spark/pom.xml
@@ -71,7 +71,17 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
-            <version>2.7.7</version>
+            <version>3.1.2</version>
+        </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-s3</artifactId>
+          <version>1.12.253</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>3.1.2</version>
         </dependency>
     </dependencies>