You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2017/10/12 17:40:59 UTC

[GitHub] flink pull request #4818: [FLINK-5706] [file systems] Add S3 file systems wi...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/4818

    [FLINK-5706] [file systems] Add S3 file systems without Hadoop dependencies

    ## What is the purpose of the change
    
    This adds two implementations of a file system that write to S3 so that users can use Flink with S3 without depending on Hadoop and have an alternative to Hadoop's S3 connectors.
    
    Both are not actual re-implementations but wrap other implementations and shade dependencies.
    
    1. The first is a wrapper around Hadoop's s3a file system. By pulling a smaller dependency tree and shading all dependencies away, this keeps the appearance of Flink being Hadoop-free, from a dependency perspective. We can also bump the shaded Hadoop dependency here to get improvements to s3a in (as in Hadoop 3.0) without causing dependency conflicts.
    
    2. The second S3 file system is from the Presto Project. Initial simple tests seem to indicate that it responds slightly faster and in a bit more lightweight manner to write/read/list requests, compared to the Hadoop s3a FS, but it has some semantical differences. For example, creating a directory does not mean the file system recognized that the directory is there. The directory is only recognized as existing once files are inserted. For checkpointing, that could even be preferable.
    
    Both file systems register themselves under `s3` to not overload the `s3n` and `s3a` schemes used by Hadoop,
    
    ## Brief change log
    
      - Adds `flink-filesystems/flink-s3-fs-hadoop`
      - Adds `flink-filesystems/flink-s3-fs-presto`
    
    ## Verifying this change
    
    This adds some initial integration tests, which do depend on S3 credentials. These credentials are not in the code, but only encrypted on Travis, which is why the tests can only run in a meaningful way either on the `apache/flink` master branch, or in a committer repository when the committer enabled Travis uploads to S3 (for logs) - the tests here use the same secret credentials.
    
    Since this does not implement the actual S3 communication, we have no tests for that. The tests only test instantiation and whether S3 communication can be established (simple reads/writes to a bucket, listing, etc).
    
    Change can also be verified by building Flink, pulling the respective S3 FS jar from `/opt` into `/lib` and running a workload that checkpoints or writes to S3.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (**yes** / no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
    
    Proper behavior of the File Systems is important, otherwise checkpointing may fail. In some sense we are already relying on proper tests of HDFS and S3 connectors by the Hadoop project. This adds a similar dependency.
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)
    
    Will add documentation once the details of this feature are agreed upon.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink fs_s3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4818
    
----
commit a0b89ad02d4cb67c4d5e1e28efcb6872af0540e6
Author: Stephan Ewen <se...@apache.org>
Date:   2017-10-06T15:41:00Z

    [FLINK-5706] [file systems] Add S3 file systems without Hadoop dependencies
    
    This adds two implementations of a file system that write to S3.
    Both are not actual re-implementations but wrap other implementations and shade dependencies.
    
    (1) A wrapper around Hadoop's s3a file system. By pulling a smaller dependency tree and
        shading all dependencies away, this keeps the appearance of Flink being Hadoop-free,
        from a dependency perspective.
    
    (2) The second S3 file system is from the Presto Project.
        Initial simple tests seem to indicate that it responds slightly faster
        and in a bit more lightweight manner to write/read/list requests, compared
        to the Hadoop s3a FS, but it has some semantical differences.

----


---

[GitHub] flink pull request #4818: [FLINK-5706] [file systems] Add S3 file systems wi...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4818#discussion_r144372829
  
    --- Diff: flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.fs.s3hadoop;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.FileSystemFactory;
    +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
    +import org.apache.flink.runtime.util.HadoopUtils;
    +
    +import org.apache.hadoop.fs.s3a.S3AFileSystem;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +/**
    + * Simple factory for the S3 file system.
    + */
    +public class S3FileSystemFactory implements FileSystemFactory {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
    +
    +	/** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'. */
    +	private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." };
    +
    +	/** Keys that are replaced (after prefix replacement, to give a more uniform experience
    +	 * across different file system implementations. */
    +	private static final String[][] MIRRORED_CONFIG_KEYS = {
    +			{ "fs.s3a.access-key", "fs.s3a.access.key" },
    +			{ "fs.s3a.secret-key", "fs.s3a.secret.key" }
    --- End diff --
    
    think about the session key; lets you support temporary credentials


---

[GitHub] flink pull request #4818: [FLINK-5706] [file systems] Add S3 file systems wi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4818


---

[GitHub] flink issue #4818: [FLINK-5706] [file systems] Add S3 file systems without H...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4818
  
    Thanks @steveloughran for the comments.
    
    I am actually using Hadoop 2.8.1 here with AWS SDK 1.11.95.
    The shaded artifacts are only a few MBs large, so this seems okay.


---

[GitHub] flink issue #4818: [FLINK-5706] [file systems] Add S3 file systems without H...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/flink/pull/4818
  
    1. I hope you pick up Hadoop 2.8.1 for this, as it's got a lot of the optimisations
    1. And equally importantly: a later SDK
    1. Though not one of the more recent 1.11 SDKs, where support is yet to ship. That's a big single shaded aws-sdk JAR so things like joda-time, jackson, guava, etc, don't cause problems, just the detail of 50+MB more of .jar on the CP.
    
    Test wise, see how well your client works with a v4 endpoint like frankfurt, as there you also have to change the endpoint used.
    
    Otherwise, nothing obvious I'd flag up as dangerous


---

[GitHub] flink pull request #4818: [FLINK-5706] [file systems] Add S3 file systems wi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4818#discussion_r144546384
  
    --- Diff: flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.fs.s3hadoop;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.FileSystemFactory;
    +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
    +import org.apache.flink.runtime.util.HadoopUtils;
    +
    +import org.apache.hadoop.fs.s3a.S3AFileSystem;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +
    +/**
    + * Simple factory for the S3 file system.
    + */
    +public class S3FileSystemFactory implements FileSystemFactory {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
    +
    +	/** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'. */
    +	private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." };
    +
    +	/** Keys that are replaced (after prefix replacement, to give a more uniform experience
    +	 * across different file system implementations. */
    +	private static final String[][] MIRRORED_CONFIG_KEYS = {
    +			{ "fs.s3a.access-key", "fs.s3a.access.key" },
    +			{ "fs.s3a.secret-key", "fs.s3a.secret.key" }
    --- End diff --
    
    Thanks, I think we will probably add more keys here over time.


---