You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/05/29 15:03:55 UTC

flink git commit: [hotfix][scala-shell] fix location of classes according to package name

Repository: flink
Updated Branches:
  refs/heads/master 5a7ceda61 -> 84fce54b6


[hotfix][scala-shell] fix location of classes according to package name

Javadoc was throwing an error because it expected the class files to be
organized in hierarchical directories.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84fce54b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84fce54b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84fce54b

Branch: refs/heads/master
Commit: 84fce54b652a3f8afca422323fa6e86c76bd55e0
Parents: 5a7ceda
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri May 29 14:53:16 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri May 29 15:03:34 2015 +0200

----------------------------------------------------------------------
 .../org.apache.flink/api/java/JarHelper.java    | 211 -------------------
 .../api/java/ScalaShellRemoteEnvironment.java   |  70 ------
 .../org/apache/flink/api/java/JarHelper.java    | 211 +++++++++++++++++++
 .../api/java/ScalaShellRemoteEnvironment.java   |  70 ++++++
 4 files changed, 281 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
deleted file mode 100644
index 5def4b0..0000000
--- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java;
-
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.InputStream;
-
-import java.util.jar.JarOutputStream;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-
-/**
- * Provides utility services for jarring and unjarring files and directories.
- * Note that a given instance of JarHelper is not threadsafe with respect to
- * multiple jar operations.
- *
- * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
- *
- * @author Patrick Calahan <pc...@bea.com>
- */
-public class JarHelper
-{
-	// ========================================================================
-	// Constants
-
-	private static final int BUFFER_SIZE = 2156;
-
-	// ========================================================================
-	// Variables
-
-	private byte[] mBuffer = new byte[BUFFER_SIZE];
-	private int mByteCount = 0;
-	private boolean mVerbose = false;
-	private String mDestJarName = "";
-
-	// ========================================================================
-	// Constructor
-
-	/**
-	 * Instantiates a new JarHelper.
-	 */
-	public JarHelper() {}
-
-	// ========================================================================
-	// Public methods
-
-	/**
-	 * Jars a given directory or single file into a JarOutputStream.
-	 */
-	public void jarDir(File dirOrFile2Jar, File destJar)
-		throws IOException {
-
-	if (dirOrFile2Jar == null || destJar == null)
-	{
-		throw new IllegalArgumentException();
-	}
-
-	mDestJarName = destJar.getCanonicalPath();
-	FileOutputStream fout = new FileOutputStream(destJar);
-	JarOutputStream jout = new JarOutputStream(fout);
-	//jout.setLevel(0);
-	try {
-		jarDir(dirOrFile2Jar, jout, null);
-	} catch(IOException ioe) {
-		throw ioe;
-	} finally {
-		jout.close();
-		fout.close();
-	}
-	}
-
-	/**
-	 * Unjars a given jar file into a given directory.
-	 */
-	public void unjarDir(File jarFile, File destDir) throws IOException {
-	BufferedOutputStream dest = null;
-	FileInputStream fis = new FileInputStream(jarFile);
-	unjar(fis, destDir);
-	}
-
-	/**
-	 * Given an InputStream on a jar file, unjars the contents into the given
-	 * directory.
-	 */
-	public void unjar(InputStream in, File destDir) throws IOException {
-	BufferedOutputStream dest = null;
-	JarInputStream jis = new JarInputStream(in);
-	JarEntry entry;
-	while ((entry = jis.getNextJarEntry()) != null) {
-		if (entry.isDirectory()) {
-		File dir = new File(destDir,entry.getName());
-		dir.mkdir();
-		if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());}
-		continue;
-		}
-		int count;
-		byte[] data = new byte[ BUFFER_SIZE ];
-		File destFile = new File(destDir, entry.getName());
-		if (mVerbose) {
-			System.out.println("unjarring " + destFile +
-					" from " + entry.getName());
-		}
-		FileOutputStream fos = new FileOutputStream(destFile);
-		dest = new BufferedOutputStream(fos, BUFFER_SIZE);
-		while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
-		dest.write(data, 0, count);
-		}
-		dest.flush();
-		dest.close();
-		if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());}
-	}
-	jis.close();
-	}
-
-	public void setVerbose(boolean b) {
-	mVerbose = b;
-	}
-
-	// ========================================================================
-	// Private methods
-
-	private static final char SEP = '/';
-	/**
-	 * Recursively jars up the given path under the given directory.
-	 */
-	private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
-		throws IOException {
-	if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
-	if (dirOrFile2jar.isDirectory()) {
-		String[] dirList = dirOrFile2jar.list();
-		String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP);
-		if (path != null) {
-		JarEntry je = new JarEntry(subPath);
-		je.setTime(dirOrFile2jar.lastModified());
-		jos.putNextEntry(je);
-		jos.flush();
-		jos.closeEntry();
-		}
-		for (int i = 0; i < dirList.length; i++) {
-		File f = new File(dirOrFile2jar, dirList[i]);
-		jarDir(f,jos,subPath);
-		}
-	} else {
-		if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
-		{
-		if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}
-		return;
-		}
-
-		if (mVerbose) {
-			System.out.println("adding " + dirOrFile2jar.getPath());
-		}
-		FileInputStream fis = new FileInputStream(dirOrFile2jar);
-		try {
-		JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
-		entry.setTime(dirOrFile2jar.lastModified());
-		jos.putNextEntry(entry);
-		while ((mByteCount = fis.read(mBuffer)) != -1) {
-			jos.write(mBuffer, 0, mByteCount);
-			if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");}
-		}
-		jos.flush();
-		jos.closeEntry();
-		} catch (IOException ioe) {
-		throw ioe;
-		} finally {
-		fis.close();
-		}
-	}
-	}
-
-	// for debugging
-	public static void main(String[] args)
-		throws IOException
-	{
-	if (args.length < 2)
-	{
-		System.err.println("Usage: JarHelper jarname.jar directory");
-		return;
-	}
-
-	JarHelper jarHelper = new JarHelper();
-	jarHelper.mVerbose = true;
-
-	File destJar = new File(args[0]);
-	File dirOrFile2Jar = new File(args[1]);
-
-	jarHelper.jarDir(dirOrFile2Jar, destJar);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
deleted file mode 100644
index 79f9576..0000000
--- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
+++ /dev/null
@@ -1,70 +0,0 @@
-
-package org.apache.flink.api.java;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *	 http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.PlanExecutor;
-
-import org.apache.flink.api.scala.FlinkILoop;
-
-/**
- * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
- * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
- * use the reference of the ILoop to write the compiled classes of the current session to
- * a Jar file and submit these with the program.
- */
-public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
-
-	// reference to Scala Shell, for access to virtual directory
-	private FlinkILoop flinkILoop;
-
-	/**
-	 * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop
-	 *
-	 * @param host	   The host name or address of the master (JobManager), where the program should be executed.
-	 * @param port	   The port of the master (JobManager), where the program should be executed.
-	 * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
-	 */
-	public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
-		super(host, port, jarFiles);
-		this.flinkILoop = flinkILoop;
-	}
-
-	/**
-	 * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
-	 *
-	 * @param jobName name of the job as string
-	 * @return Result of the computation
-	 * @throws Exception
-	 */
-	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		Plan p = createProgramPlan(jobName);
-
-		String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
-
-		// call "traditional" execution methods
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
-
-		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
-		return executor.executePlan(p);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
new file mode 100644
index 0000000..5def4b0
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+import java.util.jar.JarOutputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan <pc...@bea.com>
+ */
+public class JarHelper
+{
+	// ========================================================================
+	// Constants
+
+	private static final int BUFFER_SIZE = 2156;
+
+	// ========================================================================
+	// Variables
+
+	private byte[] mBuffer = new byte[BUFFER_SIZE];
+	private int mByteCount = 0;
+	private boolean mVerbose = false;
+	private String mDestJarName = "";
+
+	// ========================================================================
+	// Constructor
+
+	/**
+	 * Instantiates a new JarHelper.
+	 */
+	public JarHelper() {}
+
+	// ========================================================================
+	// Public methods
+
+	/**
+	 * Jars a given directory or single file into a JarOutputStream.
+	 */
+	public void jarDir(File dirOrFile2Jar, File destJar)
+		throws IOException {
+
+	if (dirOrFile2Jar == null || destJar == null)
+	{
+		throw new IllegalArgumentException();
+	}
+
+	mDestJarName = destJar.getCanonicalPath();
+	FileOutputStream fout = new FileOutputStream(destJar);
+	JarOutputStream jout = new JarOutputStream(fout);
+	//jout.setLevel(0);
+	try {
+		jarDir(dirOrFile2Jar, jout, null);
+	} catch(IOException ioe) {
+		throw ioe;
+	} finally {
+		jout.close();
+		fout.close();
+	}
+	}
+
+	/**
+	 * Unjars a given jar file into a given directory.
+	 */
+	public void unjarDir(File jarFile, File destDir) throws IOException {
+	BufferedOutputStream dest = null;
+	FileInputStream fis = new FileInputStream(jarFile);
+	unjar(fis, destDir);
+	}
+
+	/**
+	 * Given an InputStream on a jar file, unjars the contents into the given
+	 * directory.
+	 */
+	public void unjar(InputStream in, File destDir) throws IOException {
+	BufferedOutputStream dest = null;
+	JarInputStream jis = new JarInputStream(in);
+	JarEntry entry;
+	while ((entry = jis.getNextJarEntry()) != null) {
+		if (entry.isDirectory()) {
+		File dir = new File(destDir,entry.getName());
+		dir.mkdir();
+		if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());}
+		continue;
+		}
+		int count;
+		byte[] data = new byte[ BUFFER_SIZE ];
+		File destFile = new File(destDir, entry.getName());
+		if (mVerbose) {
+			System.out.println("unjarring " + destFile +
+					" from " + entry.getName());
+		}
+		FileOutputStream fos = new FileOutputStream(destFile);
+		dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+		while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+		dest.write(data, 0, count);
+		}
+		dest.flush();
+		dest.close();
+		if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());}
+	}
+	jis.close();
+	}
+
+	public void setVerbose(boolean b) {
+	mVerbose = b;
+	}
+
+	// ========================================================================
+	// Private methods
+
+	private static final char SEP = '/';
+	/**
+	 * Recursively jars up the given path under the given directory.
+	 */
+	private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
+		throws IOException {
+	if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
+	if (dirOrFile2jar.isDirectory()) {
+		String[] dirList = dirOrFile2jar.list();
+		String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP);
+		if (path != null) {
+		JarEntry je = new JarEntry(subPath);
+		je.setTime(dirOrFile2jar.lastModified());
+		jos.putNextEntry(je);
+		jos.flush();
+		jos.closeEntry();
+		}
+		for (int i = 0; i < dirList.length; i++) {
+		File f = new File(dirOrFile2jar, dirList[i]);
+		jarDir(f,jos,subPath);
+		}
+	} else {
+		if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
+		{
+		if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}
+		return;
+		}
+
+		if (mVerbose) {
+			System.out.println("adding " + dirOrFile2jar.getPath());
+		}
+		FileInputStream fis = new FileInputStream(dirOrFile2jar);
+		try {
+		JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
+		entry.setTime(dirOrFile2jar.lastModified());
+		jos.putNextEntry(entry);
+		while ((mByteCount = fis.read(mBuffer)) != -1) {
+			jos.write(mBuffer, 0, mByteCount);
+			if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");}
+		}
+		jos.flush();
+		jos.closeEntry();
+		} catch (IOException ioe) {
+		throw ioe;
+		} finally {
+		fis.close();
+		}
+	}
+	}
+
+	// for debugging
+	public static void main(String[] args)
+		throws IOException
+	{
+	if (args.length < 2)
+	{
+		System.err.println("Usage: JarHelper jarname.jar directory");
+		return;
+	}
+
+	JarHelper jarHelper = new JarHelper();
+	jarHelper.mVerbose = true;
+
+	File destJar = new File(args[0]);
+	File dirOrFile2Jar = new File(args[1]);
+
+	jarHelper.jarDir(dirOrFile2Jar, destJar);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
new file mode 100644
index 0000000..79f9576
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -0,0 +1,70 @@
+
+package org.apache.flink.api.java;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+
+import org.apache.flink.api.scala.FlinkILoop;
+
+/**
+ * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
+ * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
+ * use the reference of the ILoop to write the compiled classes of the current session to
+ * a Jar file and submit these with the program.
+ */
+public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
+
+	// reference to Scala Shell, for access to virtual directory
+	private FlinkILoop flinkILoop;
+
+	/**
+	 * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop
+	 *
+	 * @param host	   The host name or address of the master (JobManager), where the program should be executed.
+	 * @param port	   The port of the master (JobManager), where the program should be executed.
+	 * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
+	 */
+	public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
+		super(host, port, jarFiles);
+		this.flinkILoop = flinkILoop;
+	}
+
+	/**
+	 * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
+	 *
+	 * @param jobName name of the job as string
+	 * @return Result of the computation
+	 * @throws Exception
+	 */
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		Plan p = createProgramPlan(jobName);
+
+		String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
+
+		// call "traditional" execution methods
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
+
+		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
+		return executor.executePlan(p);
+	}
+}