You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/21 21:11:46 UTC

flink git commit: [FLINK-8123][py] Bundle python scripts in jar

Repository: flink
Updated Branches:
  refs/heads/master 44c603d2b -> c4107d4c3


[FLINK-8123][py] Bundle python scripts in jar


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4107d4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4107d4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4107d4c

Branch: refs/heads/master
Commit: c4107d4c336ed8dbadc03a7018eb255f4df3d1cc
Parents: 44c603d
Author: zentol <ch...@apache.org>
Authored: Tue Nov 21 14:15:24 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Nov 21 22:11:20 2017 +0100

----------------------------------------------------------------------
 flink-dist/src/main/assemblies/bin.xml          |  9 ---
 flink-libraries/flink-python/pom.xml            | 66 +++++++++++++++-----
 .../flink-python/src/assembly/python.xml        | 37 +++++++++++
 .../flink/python/api/PythonPlanBinder.java      | 55 +++++++++++-----
 4 files changed, 128 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index eb6867d..4415d25 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -193,15 +193,6 @@ under the License.
 			</includes>
 		</fileSet>
 
-		<!-- copy python package -->
-		<fileSet>
-			<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api</directory>
-			<outputDirectory>resources/python</outputDirectory>
-			<fileMode>0755</fileMode>
-			<excludes>
-				<exclude>**/example/**</exclude>
-			</excludes>
-		</fileSet>
 		<!-- copy python example to examples of dist -->
 		<fileSet>
 			<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory>

http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml
index 310be72..eaa5b87 100644
--- a/flink-libraries/flink-python/pom.xml
+++ b/flink-libraries/flink-python/pom.xml
@@ -30,24 +30,58 @@ under the License.
     <artifactId>flink-python_${scala.binary.version}</artifactId>
     <name>flink-python</name>
     <packaging>jar</packaging>
+	
+	<build>
+		<resources>
+			<resource>
+				<!-- include the zip generated by the assembly-plugin in the jar as a resource -->
+				<directory>target</directory>
+				<includes>
+					<include>python-source.zip</include>
+				</includes>
+			</resource>
+		</resources>
 
-    <build>
         <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                    <archive>
-                        <manifest>
-                            <addClasspath>true</addClasspath>
-                            <mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
-                        </manifest>
-                    </archive>
-                </configuration>
-            </plugin>
+			<plugin>
+				<!-- generate zip containing the flink python library -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptor>src/assembly/python.xml</descriptor>
+					<finalName>python-source</finalName>
+					<appendAssemblyId>false</appendAssemblyId>
+				</configuration>
+				<executions>
+					<execution>
+						<phase>generate-resources</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/assembly/python.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/assembly/python.xml b/flink-libraries/flink-python/src/assembly/python.xml
new file mode 100644
index 0000000..4487b7c
--- /dev/null
+++ b/flink-libraries/flink-python/src/assembly/python.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly
+	xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>python</id>
+	<formats>
+		<format>zip</format>
+	</formats>
+
+	<includeBaseDirectory>false</includeBaseDirectory>
+
+	<fileSets>
+		<fileSet>
+			<directory>src/main/python/org/apache/flink/python/api/flink</directory>
+			<outputDirectory>flink</outputDirectory>
+		</fileSet>
+	</fileSets>
+
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index e0c8215..e4aa518 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint;
@@ -45,6 +46,7 @@ import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.python.api.util.SetCache;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +56,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
 import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
@@ -72,12 +76,8 @@ public class PythonPlanBinder {
 	public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
 	public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
 
-	private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python";
-
 	private final Configuration operatorConfig;
 
-	private final String pythonLibraryPath;
-
 	private final String tmpPlanFilesDir;
 	private Path tmpDistributedDir;
 
@@ -110,13 +110,6 @@ public class PythonPlanBinder {
 
 		tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
 
-		String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
-		pythonLibraryPath = flinkRootDir != null
-				//command-line
-				? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH
-				//testing
-				: new File(System.getProperty("user.dir"), "src/main/python/org/apache/flink/python/api").getAbsolutePath();
-
 		operatorConfig = new Configuration();
 		operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
 		String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
@@ -163,10 +156,16 @@ public class PythonPlanBinder {
 				}
 			}
 
-			// copy flink library, plan file and additional files to temporary location
+			// setup temporary local directory for flink python library and user files
+			Path targetDir = new Path(tmpPlanFilesDir);
+			deleteIfExists(targetDir);
+			targetDir.getFileSystem().mkdirs(targetDir);
+
+			// extract and unzip flink library to temporary location
+			unzipPythonLibrary(new Path(tmpPlanFilesDir));
+
+			// copy user files to temporary location
 			Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
-			deleteIfExists(tmpPlanFilesPath);
-			FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false);
 			copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
 			for (String file : filesToCopy) {
 				Path source = new Path(file);
@@ -213,6 +212,34 @@ public class PythonPlanBinder {
 		}
 	}
 
+	private static void unzipPythonLibrary(Path targetDir) throws IOException {
+		FileSystem targetFs = targetDir.getFileSystem();
+		ClassLoader classLoader = PythonPlanBinder.class.getClassLoader();
+		ZipInputStream zis = new ZipInputStream(classLoader.getResourceAsStream("python-source.zip"));
+		ZipEntry entry = zis.getNextEntry();
+		while (entry != null) {
+			String fileName = entry.getName();
+			Path newFile = new Path(targetDir, fileName);
+			if (entry.isDirectory()) {
+				targetFs.mkdirs(newFile);
+			} else {
+				try {
+					LOG.debug("Unzipping to {}.", newFile);
+					FSDataOutputStream fsDataOutputStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE);
+					IOUtils.copyBytes(zis, fsDataOutputStream, false);
+				} catch (Exception e) {
+					zis.closeEntry();
+					zis.close();
+					throw new IOException("Failed to unzip flink python library.", e);
+				}
+			}
+
+			zis.closeEntry();
+			entry = zis.getNextEntry();
+		}
+		zis.closeEntry();
+	}
+
 	//=====Setup========================================================================================================
 
 	private static void deleteIfExists(Path path) throws IOException {