You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/07/05 10:59:24 UTC

[incubator-wayang] branch s3 created (now f232a5ed)

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a change to branch s3
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


      at f232a5ed [WAYANG-S3] support for s3 in apache wayang

This branch includes the following new commits:

     new 2aa6937c [WAYANG-211] move script to build
     new f45b3bad Merge remote-tracking branch 'apache/main' into main-tmp
     new f232a5ed [WAYANG-S3] support for s3 in apache wayang

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-wayang] 02/03: Merge remote-tracking branch 'apache/main' into main-tmp

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch s3
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f45b3bad787c58af6dddc0668f90a34d3bb4c231
Merge: 2aa6937c c3ae47a0
Author: bertty <be...@apache.org>
AuthorDate: Mon Jul 4 22:08:17 2022 +0200

    Merge remote-tracking branch 'apache/main' into main-tmp

 .asf.yaml                        |  12 ++++
 .github/workflows/todo2issue.yml |   6 +-
 README.md                        | 140 +++++++++++++++------------------------
 bin/check-license.sh             |  61 +++++++++++++++++
 4 files changed, 130 insertions(+), 89 deletions(-)


[incubator-wayang] 03/03: [WAYANG-S3] support for s3 in apache wayang

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch s3
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f232a5ed35da32a549baaa6ea7db962b2ea1f483
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Tue Jul 5 12:58:38 2022 +0200

    [WAYANG-S3] support for s3 in apache wayang
    
    Signed-off-by: bertty <be...@apache.org>
---
 pom.xml                                            |   2 +-
 wayang-benchmark/pom.xml                           |   5 +
 wayang-commons/pom.xml                             |   8 +
 wayang-commons/wayang-core/pom.xml                 |  22 +-
 .../apache/wayang/core/util/fs/FileSystems.java    |   3 +-
 .../apache/wayang/core/util/fs/S3FileSystem.java   | 232 +++++++++++++++++++++
 .../wayang/spark/platform/SparkPlatform.java       |  17 +-
 wayang-platforms/wayang-spark/pom.xml              |  12 +-
 8 files changed, 296 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 36ab3ede..b506f3b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,7 @@
         <mockito.version>3.5.10</mockito.version>
         <mockk.version>1.10.0</mockk.version>
         <external.platforms.scope>provided</external.platforms.scope>
-        <hadoop.version>2.7.7</hadoop.version>
+        <hadoop.version>3.1.2</hadoop.version>
         <!-- To be overridden by individual modules -->
         <java-module-name>org.apache.wayang.default</java-module-name>
         <code.coverage.project.folder>${basedir}/</code.coverage.project.folder>
diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml
index f230ee09..7df47adf 100644
--- a/wayang-benchmark/pom.xml
+++ b/wayang-benchmark/pom.xml
@@ -59,6 +59,11 @@
       <artifactId>wayang-sqlite3</artifactId>
       <version>0.6.1-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>3.1.2</version>
+    </dependency>
   </dependencies>
 
   <modules>
diff --git a/wayang-commons/pom.xml b/wayang-commons/pom.xml
index fdb0e20f..29941082 100644
--- a/wayang-commons/pom.xml
+++ b/wayang-commons/pom.xml
@@ -81,6 +81,10 @@
                         <groupId>com.thoughtworks.paranamer</groupId>
                         <artifactId>paranamer</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>commons-httpclient</groupId>
+                        <artifactId>commons-httpclient</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
@@ -165,6 +169,10 @@
                         <groupId>com.thoughtworks.paranamer</groupId>
                         <artifactId>paranamer</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>commons-httpclient</groupId>
+                        <artifactId>commons-httpclient</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
diff --git a/wayang-commons/wayang-core/pom.xml b/wayang-commons/wayang-core/pom.xml
index 780c04ad..b170a49d 100644
--- a/wayang-commons/wayang-core/pom.xml
+++ b/wayang-commons/wayang-core/pom.xml
@@ -37,7 +37,17 @@
     <properties>
         <java-module-name>org.apache.wayang.core</java-module-name>
     </properties>
-
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.amazonaws</groupId>
+                <artifactId>aws-java-sdk-bom</artifactId>
+                <version>1.12.253</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
     <dependencies>
         <dependency>
             <groupId>org.yaml</groupId>
@@ -93,6 +103,16 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+            <version>1.12.253</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.13</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
index 21ae249e..087a64d2 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
@@ -45,7 +45,8 @@ public class FileSystems {
 
     private static Collection<FileSystem> registeredFileSystems = Arrays.asList(
             new LocalFileSystem(),
-            new HadoopFileSystem()
+            new HadoopFileSystem(),
+            new S3FileSystem()
     );
 
     private FileSystems() {
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
new file mode 100644
index 00000000..99fe3cb8
--- /dev/null
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
@@ -0,0 +1,232 @@
+package org.apache.wayang.core.util.fs;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class S3FileSystem implements FileSystem {
+
+  final AmazonS3 s3;
+
+  final Map<String, S3Pair> pairs = new HashMap<>();
+
+  public S3FileSystem(){
+
+    if(
+        System.getProperties().contains("fs.s3.awsAccessKeyId") &&
+        System.getProperties().contains("fs.s3.awsSecretAccessKey")
+    ){
+      BasicAWSCredentials awsCreds = new BasicAWSCredentials(
+                                              System.getProperty("fs.s3.awsAccessKeyId"),
+                                              System.getProperty("fs.s3.awsSecretAccessKey")
+      );
+      this.s3 = AmazonS3ClientBuilder.standard()
+          .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+          .build();
+    }else{
+      this.s3 = AmazonS3ClientBuilder.defaultClient();
+    }
+  }
+
+  public static void main(String... args) throws IOException {
+    S3FileSystem s3 = new S3FileSystem();
+    //String url = "s3://blossom-benchmark/HIGGS.csv";
+   // String url = "s3://blossom-benchmark/README.md";
+    String url = "s3://blossom-benchmark/lulu/lolo/lala";
+    System.out.println(url);
+    System.out.println(s3.getS3Pair(url).getBucket());
+    System.out.println(s3.getS3Pair(url).getKey());
+    System.out.println(s3.preFoldersExits(s3.getS3Pair(url)));
+
+   // System.out.println(s3.getFileSize(url));
+//    InputStream content = s3.open(url);
+//    new BufferedReader(new InputStreamReader(content)).lines().forEach(System.out::println);
+//    System.out.println(s3.listChildren(url));
+//    System.out.println(s3.isDirectory(url));
+    OutputStream output = s3.create(url, true);
+    byte[] bytes = "lala".getBytes();
+    output.write(bytes);
+    output.flush();
+    output.close();
+  }
+
+  class S3Pair{
+
+    private final String bucket;
+    private final String key;
+
+    public S3Pair(S3FileSystem s3Client, String url){
+      if( ! s3Client.canHandle(url)){
+        throw new WayangException("The files can not be handle by "+this.getClass().getSimpleName());
+      }
+      String[] parts = url.split("/", 4);
+      String key_tmp = "";
+      if(parts.length == 4) {
+        key_tmp = parts[3];
+      }
+      this.bucket = parts[2];
+      this.key = key_tmp;
+    }
+
+    public S3Pair(String bucket, String key){
+      this.bucket = bucket;
+      this.key = key;
+    }
+
+    public String getBucket() {
+      return bucket;
+    }
+
+    public String getKey() {
+      return key;
+    }
+  }
+
+  private S3Pair getS3Pair(String url){
+    S3Pair pair = this.pairs.get(url);
+    if(pair == null){
+      pair = new S3Pair(this, url);
+      this.pairs.put(url, pair);
+    }
+    return pair;
+  }
+
+  @Override
+  public long getFileSize(String fileUrl) throws FileNotFoundException {
+    return this.getFileSize(this.getS3Pair(fileUrl));
+  }
+
+  private long getFileSize(S3Pair pair) throws FileNotFoundException {
+    return this.s3.getObjectMetadata(pair.getBucket(), pair.getKey()).getContentLength();
+  }
+
+  @Override
+  public boolean canHandle(String url) {
+    String url_lower = url.substring(0, 5).toLowerCase();
+    return url_lower.startsWith("s3a:/");
+  }
+
+  @Override
+  public InputStream open(String url) throws IOException {
+    return this.open(this.getS3Pair(url));
+  }
+
+  private InputStream open(S3Pair pair) throws IOException {
+    return this.s3.getObject(pair.getBucket(), pair.getKey()).getObjectContent();
+  }
+
+  @Override
+  public OutputStream create(String url) throws IOException {
+    return this.create(this.getS3Pair(url));
+  }
+
+  private OutputStream create(S3Pair pair) throws IOException {
+    return this.create(pair, false);
+  }
+
+  @Override
+  public OutputStream create(String url, Boolean forceCreateParentDirs) throws IOException {
+    return this.create(this.getS3Pair(url), forceCreateParentDirs);
+  }
+
+  private OutputStream create(S3Pair pair, Boolean forceCreateParentDirs) throws IOException {
+    if( ! forceCreateParentDirs ){
+      if ( ! this.preFoldersExits(pair) )
+        throw new IOException(
+            String.format(
+              "The folder '%s' does not exist in the bucket '%s'",
+              pair.getKey(),
+              pair.getBucket()
+            )
+        );
+    }
+
+    PipedInputStream in = new PipedInputStream();
+    final PipedOutputStream out = new PipedOutputStream(in);
+
+    ObjectMetadata metadata = new ObjectMetadata();
+    metadata.setContentType("text/plain");
+    AmazonS3 s3Client = this.s3;
+    new Thread(new Runnable() {
+      public void run() {
+        PutObjectResult result = s3Client.putObject(pair.getBucket(), pair.getKey(), in, metadata);
+      }
+    }).start();
+    return out;
+  }
+
+  public boolean bucketExits(S3Pair pair){
+    return this.s3.doesBucketExistV2(pair.getBucket());
+  }
+
+  public boolean preFoldersExits(S3Pair pair){
+    if( ! this.s3.doesBucketExistV2(pair.getBucket()) ) return false;
+    String[] keys = pair.getKey().split("/");
+    String aggregated = "";
+    for(int i = 0; i < keys.length; i++){
+      aggregated = aggregated + "/" +keys[i];
+      if( ! isDirectory(new S3Pair(pair.getBucket(), aggregated)) ){
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isDirectory(String url) {
+    return this.isDirectory(this.getS3Pair(url));
+  }
+
+  private boolean isDirectory(S3Pair pair) {
+    if( ! this.bucketExits(pair) ) return false;
+
+    String key = pair.getKey();
+    long size = listChildren(pair).stream().filter(name -> ! name.equals(key)).count();
+    if(size > 0){
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Collection<String> listChildren(String url) {
+    return this.listChildren(this.getS3Pair(url));
+  }
+
+  private Collection<String> listChildren(S3Pair pair) {
+    ObjectListing listing = this.s3.listObjects(pair.getBucket(), pair.getKey());
+    return listing.getObjectSummaries().stream()
+        .map(obj -> obj.getKey())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean delete(String url, boolean isRecursiveDelete) throws IOException {
+    return this.delete(this.getS3Pair(url), isRecursiveDelete);
+  }
+
+  private boolean delete(S3Pair pair, boolean isRecursiveDelete) throws IOException {
+    if(!isRecursiveDelete){
+      if(isDirectory(pair)){
+        throw new IOException("the path correspond to a directory");
+      }
+    }
+    this.s3.deleteObject(pair.getBucket(), pair.getKey());
+    return true;
+  }
+}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index 9b574afc..471275ab 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -83,7 +83,12 @@ public class SparkPlatform extends Platform {
             "spark.io.compression.codec",
             "spark.driver.memory",
             "spark.executor.heartbeatInterval",
-            "spark.network.timeout"
+            "spark.network.timeout",
+    };
+
+    private static final String[] OPTIONAL_HADOOP_PROPERTIES = {
+        "fs.s3.awsAccessKeyId",
+        "fs.s3.awsSecretAccessKey"
     };
 
     /**
@@ -121,6 +126,7 @@ public class SparkPlatform extends Platform {
                     "There is already a SparkContext (master: {}): , which will be reused. " +
                             "Not all settings might be effective.", sparkContext.getConf().get("spark.master"));
             sparkConf = sparkContext.getConf();
+
         } else {
             sparkConf = new SparkConf(true);
         }
@@ -133,6 +139,7 @@ public class SparkPlatform extends Platform {
                     value -> sparkConf.set(property, value)
             );
         }
+
         if (job.getName() != null) {
             sparkConf.set("spark.app.name", job.getName());
         }
@@ -142,6 +149,14 @@ public class SparkPlatform extends Platform {
         }
         final JavaSparkContext sparkContext = this.sparkContextReference.get();
 
+        org.apache.hadoop.conf.Configuration hadoopconf = sparkContext.hadoopConfiguration();
+        for (String property: OPTIONAL_HADOOP_PROPERTIES){
+            System.out.println(property);
+            configuration.getOptionalStringProperty(property).ifPresent(
+                value -> hadoopconf.set(property, value)
+            );
+        }
+
         // Set up the JAR files.
         //sparkContext.clearJars();
         if (!sparkContext.isLocal()) {
diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml
index 32c3625f..2fcfbfc2 100644
--- a/wayang-platforms/wayang-spark/pom.xml
+++ b/wayang-platforms/wayang-spark/pom.xml
@@ -71,7 +71,17 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
-            <version>2.7.7</version>
+            <version>3.1.2</version>
+        </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-s3</artifactId>
+          <version>1.12.253</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>3.1.2</version>
         </dependency>
     </dependencies>
 


[incubator-wayang] 01/03: [WAYANG-211] move script to build

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch s3
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 2aa6937ce6f07f2f8e1ac0346c3d54b4e2d60207
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Jul 4 16:13:33 2022 +0200

    [WAYANG-211] move script to build
    
    Signed-off-by: bertty <be...@apache.org>
---
 {bin => build}/build.sh                           | 0
 {bin => build}/change-scala-version.sh            | 0
 {bin => build}/check-license.sh                   | 0
 {bin => build}/check-release.sh                   | 0
 {bin => build}/contains-scala-dependencies.sh     | 0
 {bin => build}/create_scala_structure.sh          | 0
 {bin => build}/detect-scala-dependent-projects.sh | 0
 {bin => build}/rename_org.sh                      | 0
 {bin => build}/rm-pom-backups.sh                  | 0
 {bin => build}/rollback_release.sh                | 0
 10 files changed, 0 insertions(+), 0 deletions(-)

diff --git a/bin/build.sh b/build/build.sh
similarity index 100%
rename from bin/build.sh
rename to build/build.sh
diff --git a/bin/change-scala-version.sh b/build/change-scala-version.sh
similarity index 100%
rename from bin/change-scala-version.sh
rename to build/change-scala-version.sh
diff --git a/bin/check-license.sh b/build/check-license.sh
similarity index 100%
rename from bin/check-license.sh
rename to build/check-license.sh
diff --git a/bin/check-release.sh b/build/check-release.sh
similarity index 100%
rename from bin/check-release.sh
rename to build/check-release.sh
diff --git a/bin/contains-scala-dependencies.sh b/build/contains-scala-dependencies.sh
similarity index 100%
rename from bin/contains-scala-dependencies.sh
rename to build/contains-scala-dependencies.sh
diff --git a/bin/create_scala_structure.sh b/build/create_scala_structure.sh
similarity index 100%
rename from bin/create_scala_structure.sh
rename to build/create_scala_structure.sh
diff --git a/bin/detect-scala-dependent-projects.sh b/build/detect-scala-dependent-projects.sh
similarity index 100%
rename from bin/detect-scala-dependent-projects.sh
rename to build/detect-scala-dependent-projects.sh
diff --git a/bin/rename_org.sh b/build/rename_org.sh
similarity index 100%
rename from bin/rename_org.sh
rename to build/rename_org.sh
diff --git a/bin/rm-pom-backups.sh b/build/rm-pom-backups.sh
similarity index 100%
rename from bin/rm-pom-backups.sh
rename to build/rm-pom-backups.sh
diff --git a/bin/rollback_release.sh b/build/rollback_release.sh
similarity index 100%
rename from bin/rollback_release.sh
rename to build/rollback_release.sh