You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by jelmerk <gi...@git.apache.org> on 2018/01/14 15:11:36 UTC
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
GitHub user jelmerk opened a pull request:
https://github.com/apache/flink/pull/5296
[FLINK-8432] [filesystem-connector] Add support for openstack's swift filesystem
## What is the purpose of the change
Add support for OpenStack's cloud storage solution
## Brief change log
- Added new module below flink-filesystems
## Verifying this change
This change added tests and can be verified as follows:
- Added integration tests for simple reading and writing and listing directories
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
- The serializers: don't know
- The runtime per-record code paths (performance sensitive): don't know
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? not documented
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jelmerk/flink openstack_fs_support
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5296.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5296
----
commit d46696192cc57ba9df07fabf24c6d66538ec186d
Author: Jelmer Kuperus <jk...@...>
Date: 2018-01-14T09:42:02Z
[FLINK-8432] Add support for openstack's swift filesystem
----
---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5296#discussion_r164116387
--- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+ /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */
+ private static final String CONFIG_PREFIX = "swift.";
+
+ /** Flink's configuration object. */
+ private Configuration flinkConfig;
+
+ /** Hadoop's configuration for the file systems, lazily initialized. */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ @Override
+ public String getScheme() {
+ return "swift";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null;
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)");
+
+ try {
+ // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
+
+ org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
+ if (hadoopConfig == null) {
+ if (flinkConfig != null) {
+ LOG.debug("Loading Hadoop configuration for swift native file system");
+ hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+ // hadoop.tmp.dir needs to be defined because it is used as buffer directory
+ if (hadoopConfig.get("hadoop.tmp.dir") == null) {
+ String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name");
--- End diff --
Let's use `CoreOptions#TMP_DIRS` instead of using directly `System.getProperty("java.io.tmpdir")`. That way we will use the Flink configured tmp directory.
---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5296
---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Posted by jelmerk <gi...@git.apache.org>.
Github user jelmerk commented on a diff in the pull request:
https://github.com/apache/flink/pull/5296#discussion_r164158043
--- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+ /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */
+ private static final String CONFIG_PREFIX = "swift.";
+
+ /** Flink's configuration object. */
+ private Configuration flinkConfig;
+
+ /** Hadoop's configuration for the file systems, lazily initialized. */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ @Override
+ public String getScheme() {
+ return "swift";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null;
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)");
+
+ try {
+ // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
+
+ org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
+ if (hadoopConfig == null) {
+ if (flinkConfig != null) {
+ LOG.debug("Loading Hadoop configuration for swift native file system");
+ hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+ // hadoop.tmp.dir needs to be defined because it is used as buffer directory
+ if (hadoopConfig.get("hadoop.tmp.dir") == null) {
+ String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name");
--- End diff --
Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209
---
[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/5296
Great. Thanks a lot for addressing my comments so fast @jelmerk and @etiennecarriere. Thanks a lot for your contribution. Merging this PR.
---
[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...
Posted by etiennecarriere <gi...@git.apache.org>.
Github user etiennecarriere commented on the issue:
https://github.com/apache/flink/pull/5296
We tested this PR with flink 1.4 and the swift offered by French Hoster OVH.
I was fine for checkpoint .
---
[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...
Posted by jelmerk <gi...@git.apache.org>.
Github user jelmerk commented on the issue:
https://github.com/apache/flink/pull/5296
All valid points, thanks for the review!
---
[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...
Posted by jelmerk <gi...@git.apache.org>.
Github user jelmerk commented on the issue:
https://github.com/apache/flink/pull/5296
Hi @tillrohrmann Those files with a lot of changes are unfortunately copied from hadoop, the readme file contains more information about how this works. I am not overly happy about the this approach. But its basically the exact same approach that was taken for the s3 connector https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-hadoop
---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5296#discussion_r164116058
--- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+ /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */
+ private static final String CONFIG_PREFIX = "swift.";
+
+ /** Flink's configuration object. */
+ private Configuration flinkConfig;
+
+ /** Hadoop's configuration for the file systems, lazily initialized. */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ @Override
+ public String getScheme() {
+ return "swift";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null;
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)");
+
+ try {
+ // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
+
+ org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
+ if (hadoopConfig == null) {
+ if (flinkConfig != null) {
+ LOG.debug("Loading Hadoop configuration for swift native file system");
+ hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+ // hadoop.tmp.dir needs to be defined because it is used as buffer directory
+ if (hadoopConfig.get("hadoop.tmp.dir") == null) {
+ String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name");
+ hadoopConfig.set("hadoop.tmp.dir", tmpDir);
+ }
+
+ // add additional config entries from the Flink config to the Presto Hadoop config
+ for (String key : flinkConfig.keySet()) {
+ if (key.startsWith(CONFIG_PREFIX)) {
+ String value = flinkConfig.getString(key, null);
+ String newKey = "fs.swift." + key.substring(CONFIG_PREFIX.length());
+ hadoopConfig.set(newKey, flinkConfig.getString(key, null));
--- End diff --
`flinkConfig.getString(key, null)` can be replaced by `value`.
---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Posted by jelmerk <gi...@git.apache.org>.
Github user jelmerk commented on a diff in the pull request:
https://github.com/apache/flink/pull/5296#discussion_r164158508
--- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for the Swift file system support.
+ */
+public class HadoopSwiftFileSystemITCase extends TestLogger {
--- End diff --
I did
---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Posted by jelmerk <gi...@git.apache.org>.
Github user jelmerk commented on a diff in the pull request:
https://github.com/apache/flink/pull/5296#discussion_r164158160
--- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+ /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */
+ private static final String CONFIG_PREFIX = "swift.";
+
+ /** Flink's configuration object. */
+ private Configuration flinkConfig;
+
+ /** Hadoop's configuration for the file systems, lazily initialized. */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ @Override
+ public String getScheme() {
+ return "swift";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null;
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)");
+
+ try {
+ // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
+
+ org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
+ if (hadoopConfig == null) {
+ if (flinkConfig != null) {
+ LOG.debug("Loading Hadoop configuration for swift native file system");
+ hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+ // hadoop.tmp.dir needs to be defined because it is used as buffer directory
+ if (hadoopConfig.get("hadoop.tmp.dir") == null) {
+ String tmpDir = System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + System.getProperty("user.name");
+ hadoopConfig.set("hadoop.tmp.dir", tmpDir);
+ }
+
+ // add additional config entries from the Flink config to the Presto Hadoop config
+ for (String key : flinkConfig.keySet()) {
+ if (key.startsWith(CONFIG_PREFIX)) {
+ String value = flinkConfig.getString(key, null);
+ String newKey = "fs.swift." + key.substring(CONFIG_PREFIX.length());
+ hadoopConfig.set(newKey, flinkConfig.getString(key, null));
--- End diff --
Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209
---
[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5296#discussion_r164119372
--- Diff: flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for the Swift file system support.
+ */
+public class HadoopSwiftFileSystemITCase extends TestLogger {
--- End diff --
I assume yo run these tests locally on OpenStack, right?
---