You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/21 21:11:46 UTC
flink git commit: [FLINK-8123][py] Bundle python scripts in jar
Repository: flink
Updated Branches:
refs/heads/master 44c603d2b -> c4107d4c3
[FLINK-8123][py] Bundle python scripts in jar
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4107d4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4107d4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4107d4c
Branch: refs/heads/master
Commit: c4107d4c336ed8dbadc03a7018eb255f4df3d1cc
Parents: 44c603d
Author: zentol <ch...@apache.org>
Authored: Tue Nov 21 14:15:24 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Nov 21 22:11:20 2017 +0100
----------------------------------------------------------------------
flink-dist/src/main/assemblies/bin.xml | 9 ---
flink-libraries/flink-python/pom.xml | 66 +++++++++++++++-----
.../flink-python/src/assembly/python.xml | 37 +++++++++++
.../flink/python/api/PythonPlanBinder.java | 55 +++++++++++-----
4 files changed, 128 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index eb6867d..4415d25 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -193,15 +193,6 @@ under the License.
</includes>
</fileSet>
- <!-- copy python package -->
- <fileSet>
- <directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api</directory>
- <outputDirectory>resources/python</outputDirectory>
- <fileMode>0755</fileMode>
- <excludes>
- <exclude>**/example/**</exclude>
- </excludes>
- </fileSet>
<!-- copy python example to examples of dist -->
<fileSet>
<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory>
http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml
index 310be72..eaa5b87 100644
--- a/flink-libraries/flink-python/pom.xml
+++ b/flink-libraries/flink-python/pom.xml
@@ -30,24 +30,58 @@ under the License.
<artifactId>flink-python_${scala.binary.version}</artifactId>
<name>flink-python</name>
<packaging>jar</packaging>
+
+ <build>
+ <resources>
+ <resource>
+ <!-- include the zip generated by the assembly-plugin in the jar as a resource -->
+ <directory>target</directory>
+ <includes>
+ <include>python-source.zip</include>
+ </includes>
+ </resource>
+ </resources>
- <build>
<plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- <archive>
- <manifest>
- <addClasspath>true</addClasspath>
- <mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
- </manifest>
- </archive>
- </configuration>
- </plugin>
+ <plugin>
+ <!-- generate zip containing the flink python library -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/assembly/python.xml</descriptor>
+ <finalName>python-source</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/assembly/python.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/assembly/python.xml b/flink-libraries/flink-python/src/assembly/python.xml
new file mode 100644
index 0000000..4487b7c
--- /dev/null
+++ b/flink-libraries/flink-python/src/assembly/python.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>python</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/python/org/apache/flink/python/api/flink</directory>
+ <outputDirectory>flink</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index e0c8215..e4aa518 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint;
@@ -45,6 +46,7 @@ import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.python.api.util.SetCache;
import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +56,8 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
@@ -72,12 +76,8 @@ public class PythonPlanBinder {
public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
- private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python";
-
private final Configuration operatorConfig;
- private final String pythonLibraryPath;
-
private final String tmpPlanFilesDir;
private Path tmpDistributedDir;
@@ -110,13 +110,6 @@ public class PythonPlanBinder {
tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
- String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
- pythonLibraryPath = flinkRootDir != null
- //command-line
- ? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH
- //testing
- : new File(System.getProperty("user.dir"), "src/main/python/org/apache/flink/python/api").getAbsolutePath();
-
operatorConfig = new Configuration();
operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
@@ -163,10 +156,16 @@ public class PythonPlanBinder {
}
}
- // copy flink library, plan file and additional files to temporary location
+ // setup temporary local directory for flink python library and user files
+ Path targetDir = new Path(tmpPlanFilesDir);
+ deleteIfExists(targetDir);
+ targetDir.getFileSystem().mkdirs(targetDir);
+
+ // extract and unzip flink library to temporary location
+ unzipPythonLibrary(new Path(tmpPlanFilesDir));
+
+ // copy user files to temporary location
Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
- deleteIfExists(tmpPlanFilesPath);
- FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false);
copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
for (String file : filesToCopy) {
Path source = new Path(file);
@@ -213,6 +212,34 @@ public class PythonPlanBinder {
}
}
+ private static void unzipPythonLibrary(Path targetDir) throws IOException {
+ FileSystem targetFs = targetDir.getFileSystem();
+ ClassLoader classLoader = PythonPlanBinder.class.getClassLoader();
+ ZipInputStream zis = new ZipInputStream(classLoader.getResourceAsStream("python-source.zip"));
+ ZipEntry entry = zis.getNextEntry();
+ while (entry != null) {
+ String fileName = entry.getName();
+ Path newFile = new Path(targetDir, fileName);
+ if (entry.isDirectory()) {
+ targetFs.mkdirs(newFile);
+ } else {
+ try {
+ LOG.debug("Unzipping to {}.", newFile);
+ FSDataOutputStream fsDataOutputStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE);
+ IOUtils.copyBytes(zis, fsDataOutputStream, false);
+ } catch (Exception e) {
+ zis.closeEntry();
+ zis.close();
+ throw new IOException("Failed to unzip flink python library.", e);
+ }
+ }
+
+ zis.closeEntry();
+ entry = zis.getNextEntry();
+ }
+ zis.closeEntry();
+ }
+
//=====Setup========================================================================================================
private static void deleteIfExists(Path path) throws IOException {