You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/07/05 10:59:27 UTC
[incubator-wayang] 03/03: [WAYANG-S3] support for s3 in apache wayang
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch s3
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit f232a5ed35da32a549baaa6ea7db962b2ea1f483
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Tue Jul 5 12:58:38 2022 +0200
[WAYANG-S3] support for s3 in apache wayang
Signed-off-by: bertty <be...@apache.org>
---
pom.xml | 2 +-
wayang-benchmark/pom.xml | 5 +
wayang-commons/pom.xml | 8 +
wayang-commons/wayang-core/pom.xml | 22 +-
.../apache/wayang/core/util/fs/FileSystems.java | 3 +-
.../apache/wayang/core/util/fs/S3FileSystem.java | 232 +++++++++++++++++++++
.../wayang/spark/platform/SparkPlatform.java | 17 +-
wayang-platforms/wayang-spark/pom.xml | 12 +-
8 files changed, 296 insertions(+), 5 deletions(-)
diff --git a/pom.xml b/pom.xml
index 36ab3ede..b506f3b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,7 @@
<mockito.version>3.5.10</mockito.version>
<mockk.version>1.10.0</mockk.version>
<external.platforms.scope>provided</external.platforms.scope>
- <hadoop.version>2.7.7</hadoop.version>
+ <hadoop.version>3.1.2</hadoop.version>
<!-- To be overridden by individual modules -->
<java-module-name>org.apache.wayang.default</java-module-name>
<code.coverage.project.folder>${basedir}/</code.coverage.project.folder>
diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml
index f230ee09..7df47adf 100644
--- a/wayang-benchmark/pom.xml
+++ b/wayang-benchmark/pom.xml
@@ -59,6 +59,11 @@
<artifactId>wayang-sqlite3</artifactId>
<version>0.6.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>3.1.2</version>
+ </dependency>
</dependencies>
<modules>
diff --git a/wayang-commons/pom.xml b/wayang-commons/pom.xml
index fdb0e20f..29941082 100644
--- a/wayang-commons/pom.xml
+++ b/wayang-commons/pom.xml
@@ -81,6 +81,10 @@
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -165,6 +169,10 @@
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/wayang-commons/wayang-core/pom.xml b/wayang-commons/wayang-core/pom.xml
index 780c04ad..b170a49d 100644
--- a/wayang-commons/wayang-core/pom.xml
+++ b/wayang-commons/wayang-core/pom.xml
@@ -37,7 +37,17 @@
<properties>
<java-module-name>org.apache.wayang.core</java-module-name>
</properties>
-
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bom</artifactId>
+ <version>1.12.253</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
@@ -93,6 +103,16 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.12.253</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.13</version>
+ </dependency>
</dependencies>
</project>
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
index 21ae249e..087a64d2 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
@@ -45,7 +45,8 @@ public class FileSystems {
private static Collection<FileSystem> registeredFileSystems = Arrays.asList(
new LocalFileSystem(),
- new HadoopFileSystem()
+ new HadoopFileSystem(),
+ new S3FileSystem()
);
private FileSystems() {
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
new file mode 100644
index 00000000..99fe3cb8
--- /dev/null
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
@@ -0,0 +1,232 @@
+package org.apache.wayang.core.util.fs;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class S3FileSystem implements FileSystem {
+
+ final AmazonS3 s3;
+
+ final Map<String, S3Pair> pairs = new HashMap<>();
+
+ public S3FileSystem(){
+
+ if(
+ System.getProperties().contains("fs.s3.awsAccessKeyId") &&
+ System.getProperties().contains("fs.s3.awsSecretAccessKey")
+ ){
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(
+ System.getProperty("fs.s3.awsAccessKeyId"),
+ System.getProperty("fs.s3.awsSecretAccessKey")
+ );
+ this.s3 = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .build();
+ }else{
+ this.s3 = AmazonS3ClientBuilder.defaultClient();
+ }
+ }
+
+ public static void main(String... args) throws IOException {
+ S3FileSystem s3 = new S3FileSystem();
+ //String url = "s3://blossom-benchmark/HIGGS.csv";
+ // String url = "s3://blossom-benchmark/README.md";
+ String url = "s3://blossom-benchmark/lulu/lolo/lala";
+ System.out.println(url);
+ System.out.println(s3.getS3Pair(url).getBucket());
+ System.out.println(s3.getS3Pair(url).getKey());
+ System.out.println(s3.preFoldersExits(s3.getS3Pair(url)));
+
+ // System.out.println(s3.getFileSize(url));
+// InputStream content = s3.open(url);
+// new BufferedReader(new InputStreamReader(content)).lines().forEach(System.out::println);
+// System.out.println(s3.listChildren(url));
+// System.out.println(s3.isDirectory(url));
+ OutputStream output = s3.create(url, true);
+ byte[] bytes = "lala".getBytes();
+ output.write(bytes);
+ output.flush();
+ output.close();
+ }
+
+ class S3Pair{
+
+ private final String bucket;
+ private final String key;
+
+ public S3Pair(S3FileSystem s3Client, String url){
+ if( ! s3Client.canHandle(url)){
+ throw new WayangException("The files can not be handle by "+this.getClass().getSimpleName());
+ }
+ String[] parts = url.split("/", 4);
+ String key_tmp = "";
+ if(parts.length == 4) {
+ key_tmp = parts[3];
+ }
+ this.bucket = parts[2];
+ this.key = key_tmp;
+ }
+
+ public S3Pair(String bucket, String key){
+ this.bucket = bucket;
+ this.key = key;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getKey() {
+ return key;
+ }
+ }
+
+ private S3Pair getS3Pair(String url){
+ S3Pair pair = this.pairs.get(url);
+ if(pair == null){
+ pair = new S3Pair(this, url);
+ this.pairs.put(url, pair);
+ }
+ return pair;
+ }
+
+ @Override
+ public long getFileSize(String fileUrl) throws FileNotFoundException {
+ return this.getFileSize(this.getS3Pair(fileUrl));
+ }
+
+ private long getFileSize(S3Pair pair) throws FileNotFoundException {
+ return this.s3.getObjectMetadata(pair.getBucket(), pair.getKey()).getContentLength();
+ }
+
+ @Override
+ public boolean canHandle(String url) {
+ String url_lower = url.substring(0, 5).toLowerCase();
+ return url_lower.startsWith("s3a:/");
+ }
+
+ @Override
+ public InputStream open(String url) throws IOException {
+ return this.open(this.getS3Pair(url));
+ }
+
+ private InputStream open(S3Pair pair) throws IOException {
+ return this.s3.getObject(pair.getBucket(), pair.getKey()).getObjectContent();
+ }
+
+ @Override
+ public OutputStream create(String url) throws IOException {
+ return this.create(this.getS3Pair(url));
+ }
+
+ private OutputStream create(S3Pair pair) throws IOException {
+ return this.create(pair, false);
+ }
+
+ @Override
+ public OutputStream create(String url, Boolean forceCreateParentDirs) throws IOException {
+ return this.create(this.getS3Pair(url), forceCreateParentDirs);
+ }
+
+ private OutputStream create(S3Pair pair, Boolean forceCreateParentDirs) throws IOException {
+ if( ! forceCreateParentDirs ){
+ if ( ! this.preFoldersExits(pair) )
+ throw new IOException(
+ String.format(
+ "The folder '%s' does not exist in the bucket '%s'",
+ pair.getKey(),
+ pair.getBucket()
+ )
+ );
+ }
+
+ PipedInputStream in = new PipedInputStream();
+ final PipedOutputStream out = new PipedOutputStream(in);
+
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentType("text/plain");
+ AmazonS3 s3Client = this.s3;
+ new Thread(new Runnable() {
+ public void run() {
+ PutObjectResult result = s3Client.putObject(pair.getBucket(), pair.getKey(), in, metadata);
+ }
+ }).start();
+ return out;
+ }
+
+ public boolean bucketExits(S3Pair pair){
+ return this.s3.doesBucketExistV2(pair.getBucket());
+ }
+
+ public boolean preFoldersExits(S3Pair pair){
+ if( ! this.s3.doesBucketExistV2(pair.getBucket()) ) return false;
+ String[] keys = pair.getKey().split("/");
+ String aggregated = "";
+ for(int i = 0; i < keys.length; i++){
+ aggregated = aggregated + "/" +keys[i];
+ if( ! isDirectory(new S3Pair(pair.getBucket(), aggregated)) ){
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isDirectory(String url) {
+ return this.isDirectory(this.getS3Pair(url));
+ }
+
+ private boolean isDirectory(S3Pair pair) {
+ if( ! this.bucketExits(pair) ) return false;
+
+ String key = pair.getKey();
+ long size = listChildren(pair).stream().filter(name -> ! name.equals(key)).count();
+ if(size > 0){
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Collection<String> listChildren(String url) {
+ return this.listChildren(this.getS3Pair(url));
+ }
+
+ private Collection<String> listChildren(S3Pair pair) {
+ ObjectListing listing = this.s3.listObjects(pair.getBucket(), pair.getKey());
+ return listing.getObjectSummaries().stream()
+ .map(obj -> obj.getKey())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean delete(String url, boolean isRecursiveDelete) throws IOException {
+ return this.delete(this.getS3Pair(url), isRecursiveDelete);
+ }
+
+ private boolean delete(S3Pair pair, boolean isRecursiveDelete) throws IOException {
+ if(!isRecursiveDelete){
+ if(isDirectory(pair)){
+ throw new IOException("the path correspond to a directory");
+ }
+ }
+ this.s3.deleteObject(pair.getBucket(), pair.getKey());
+ return true;
+ }
+}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index 9b574afc..471275ab 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -83,7 +83,12 @@ public class SparkPlatform extends Platform {
"spark.io.compression.codec",
"spark.driver.memory",
"spark.executor.heartbeatInterval",
- "spark.network.timeout"
+ "spark.network.timeout",
+ };
+
+ private static final String[] OPTIONAL_HADOOP_PROPERTIES = {
+ "fs.s3.awsAccessKeyId",
+ "fs.s3.awsSecretAccessKey"
};
/**
@@ -121,6 +126,7 @@ public class SparkPlatform extends Platform {
"There is already a SparkContext (master: {}): , which will be reused. " +
"Not all settings might be effective.", sparkContext.getConf().get("spark.master"));
sparkConf = sparkContext.getConf();
+
} else {
sparkConf = new SparkConf(true);
}
@@ -133,6 +139,7 @@ public class SparkPlatform extends Platform {
value -> sparkConf.set(property, value)
);
}
+
if (job.getName() != null) {
sparkConf.set("spark.app.name", job.getName());
}
@@ -142,6 +149,14 @@ public class SparkPlatform extends Platform {
}
final JavaSparkContext sparkContext = this.sparkContextReference.get();
+ org.apache.hadoop.conf.Configuration hadoopconf = sparkContext.hadoopConfiguration();
+ for (String property: OPTIONAL_HADOOP_PROPERTIES){
+ System.out.println(property);
+ configuration.getOptionalStringProperty(property).ifPresent(
+ value -> hadoopconf.set(property, value)
+ );
+ }
+
// Set up the JAR files.
//sparkContext.clearJars();
if (!sparkContext.isLocal()) {
diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml
index 32c3625f..2fcfbfc2 100644
--- a/wayang-platforms/wayang-spark/pom.xml
+++ b/wayang-platforms/wayang-spark/pom.xml
@@ -71,7 +71,17 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
- <version>2.7.7</version>
+ <version>3.1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.12.253</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>3.1.2</version>
</dependency>
</dependencies>