You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/05/29 15:03:55 UTC
flink git commit: [hotfix][scala-shell] fix location of classes
according to package name
Repository: flink
Updated Branches:
refs/heads/master 5a7ceda61 -> 84fce54b6
[hotfix][scala-shell] fix location of classes according to package name
Javadoc was throwing an error because it expected the class files to be
organized in hierarchical directories.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84fce54b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84fce54b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84fce54b
Branch: refs/heads/master
Commit: 84fce54b652a3f8afca422323fa6e86c76bd55e0
Parents: 5a7ceda
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri May 29 14:53:16 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri May 29 15:03:34 2015 +0200
----------------------------------------------------------------------
.../org.apache.flink/api/java/JarHelper.java | 211 -------------------
.../api/java/ScalaShellRemoteEnvironment.java | 70 ------
.../org/apache/flink/api/java/JarHelper.java | 211 +++++++++++++++++++
.../api/java/ScalaShellRemoteEnvironment.java | 70 ++++++
4 files changed, 281 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
deleted file mode 100644
index 5def4b0..0000000
--- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java;
-
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.InputStream;
-
-import java.util.jar.JarOutputStream;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-
-/**
- * Provides utility services for jarring and unjarring files and directories.
- * Note that a given instance of JarHelper is not threadsafe with respect to
- * multiple jar operations.
- *
- * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
- *
- * @author Patrick Calahan <pc...@bea.com>
- */
-public class JarHelper
-{
- // ========================================================================
- // Constants
-
- private static final int BUFFER_SIZE = 2156;
-
- // ========================================================================
- // Variables
-
- private byte[] mBuffer = new byte[BUFFER_SIZE];
- private int mByteCount = 0;
- private boolean mVerbose = false;
- private String mDestJarName = "";
-
- // ========================================================================
- // Constructor
-
- /**
- * Instantiates a new JarHelper.
- */
- public JarHelper() {}
-
- // ========================================================================
- // Public methods
-
- /**
- * Jars a given directory or single file into a JarOutputStream.
- */
- public void jarDir(File dirOrFile2Jar, File destJar)
- throws IOException {
-
- if (dirOrFile2Jar == null || destJar == null)
- {
- throw new IllegalArgumentException();
- }
-
- mDestJarName = destJar.getCanonicalPath();
- FileOutputStream fout = new FileOutputStream(destJar);
- JarOutputStream jout = new JarOutputStream(fout);
- //jout.setLevel(0);
- try {
- jarDir(dirOrFile2Jar, jout, null);
- } catch(IOException ioe) {
- throw ioe;
- } finally {
- jout.close();
- fout.close();
- }
- }
-
- /**
- * Unjars a given jar file into a given directory.
- */
- public void unjarDir(File jarFile, File destDir) throws IOException {
- BufferedOutputStream dest = null;
- FileInputStream fis = new FileInputStream(jarFile);
- unjar(fis, destDir);
- }
-
- /**
- * Given an InputStream on a jar file, unjars the contents into the given
- * directory.
- */
- public void unjar(InputStream in, File destDir) throws IOException {
- BufferedOutputStream dest = null;
- JarInputStream jis = new JarInputStream(in);
- JarEntry entry;
- while ((entry = jis.getNextJarEntry()) != null) {
- if (entry.isDirectory()) {
- File dir = new File(destDir,entry.getName());
- dir.mkdir();
- if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());}
- continue;
- }
- int count;
- byte[] data = new byte[ BUFFER_SIZE ];
- File destFile = new File(destDir, entry.getName());
- if (mVerbose) {
- System.out.println("unjarring " + destFile +
- " from " + entry.getName());
- }
- FileOutputStream fos = new FileOutputStream(destFile);
- dest = new BufferedOutputStream(fos, BUFFER_SIZE);
- while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
- dest.write(data, 0, count);
- }
- dest.flush();
- dest.close();
- if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());}
- }
- jis.close();
- }
-
- public void setVerbose(boolean b) {
- mVerbose = b;
- }
-
- // ========================================================================
- // Private methods
-
- private static final char SEP = '/';
- /**
- * Recursively jars up the given path under the given directory.
- */
- private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
- throws IOException {
- if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
- if (dirOrFile2jar.isDirectory()) {
- String[] dirList = dirOrFile2jar.list();
- String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP);
- if (path != null) {
- JarEntry je = new JarEntry(subPath);
- je.setTime(dirOrFile2jar.lastModified());
- jos.putNextEntry(je);
- jos.flush();
- jos.closeEntry();
- }
- for (int i = 0; i < dirList.length; i++) {
- File f = new File(dirOrFile2jar, dirList[i]);
- jarDir(f,jos,subPath);
- }
- } else {
- if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
- {
- if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}
- return;
- }
-
- if (mVerbose) {
- System.out.println("adding " + dirOrFile2jar.getPath());
- }
- FileInputStream fis = new FileInputStream(dirOrFile2jar);
- try {
- JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
- entry.setTime(dirOrFile2jar.lastModified());
- jos.putNextEntry(entry);
- while ((mByteCount = fis.read(mBuffer)) != -1) {
- jos.write(mBuffer, 0, mByteCount);
- if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");}
- }
- jos.flush();
- jos.closeEntry();
- } catch (IOException ioe) {
- throw ioe;
- } finally {
- fis.close();
- }
- }
- }
-
- // for debugging
- public static void main(String[] args)
- throws IOException
- {
- if (args.length < 2)
- {
- System.err.println("Usage: JarHelper jarname.jar directory");
- return;
- }
-
- JarHelper jarHelper = new JarHelper();
- jarHelper.mVerbose = true;
-
- File destJar = new File(args[0]);
- File dirOrFile2Jar = new File(args[1]);
-
- jarHelper.jarDir(dirOrFile2Jar, destJar);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
deleted file mode 100644
index 79f9576..0000000
--- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
+++ /dev/null
@@ -1,70 +0,0 @@
-
-package org.apache.flink.api.java;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.PlanExecutor;
-
-import org.apache.flink.api.scala.FlinkILoop;
-
-/**
- * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
- * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
- * use the reference of the ILoop to write the compiled classes of the current session to
- * a Jar file and submit these with the program.
- */
-public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
-
- // reference to Scala Shell, for access to virtual directory
- private FlinkILoop flinkILoop;
-
- /**
- * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop
- *
- * @param host The host name or address of the master (JobManager), where the program should be executed.
- * @param port The port of the master (JobManager), where the program should be executed.
- * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
- */
- public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
- super(host, port, jarFiles);
- this.flinkILoop = flinkILoop;
- }
-
- /**
- * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
- *
- * @param jobName name of the job as string
- * @return Result of the computation
- * @throws Exception
- */
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- Plan p = createProgramPlan(jobName);
-
- String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
-
- // call "traditional" execution methods
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
-
- executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
- return executor.executePlan(p);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
new file mode 100644
index 0000000..5def4b0
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+import java.util.jar.JarOutputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan <pc...@bea.com>
+ */
+public class JarHelper
+{
+ // ========================================================================
+ // Constants
+
+ private static final int BUFFER_SIZE = 2156;
+
+ // ========================================================================
+ // Variables
+
+ private byte[] mBuffer = new byte[BUFFER_SIZE];
+ private int mByteCount = 0;
+ private boolean mVerbose = false;
+ private String mDestJarName = "";
+
+ // ========================================================================
+ // Constructor
+
+ /**
+ * Instantiates a new JarHelper.
+ */
+ public JarHelper() {}
+
+ // ========================================================================
+ // Public methods
+
+ /**
+ * Jars a given directory or single file into a JarOutputStream.
+ */
+ public void jarDir(File dirOrFile2Jar, File destJar)
+ throws IOException {
+
+ if (dirOrFile2Jar == null || destJar == null)
+ {
+ throw new IllegalArgumentException();
+ }
+
+ mDestJarName = destJar.getCanonicalPath();
+ FileOutputStream fout = new FileOutputStream(destJar);
+ JarOutputStream jout = new JarOutputStream(fout);
+ //jout.setLevel(0);
+ try {
+ jarDir(dirOrFile2Jar, jout, null);
+ } catch(IOException ioe) {
+ throw ioe;
+ } finally {
+ jout.close();
+ fout.close();
+ }
+ }
+
+ /**
+ * Unjars a given jar file into a given directory.
+ */
+ public void unjarDir(File jarFile, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ FileInputStream fis = new FileInputStream(jarFile);
+ unjar(fis, destDir);
+ }
+
+ /**
+ * Given an InputStream on a jar file, unjars the contents into the given
+ * directory.
+ */
+ public void unjar(InputStream in, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ JarInputStream jis = new JarInputStream(in);
+ JarEntry entry;
+ while ((entry = jis.getNextJarEntry()) != null) {
+ if (entry.isDirectory()) {
+ File dir = new File(destDir,entry.getName());
+ dir.mkdir();
+ if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());}
+ continue;
+ }
+ int count;
+ byte[] data = new byte[ BUFFER_SIZE ];
+ File destFile = new File(destDir, entry.getName());
+ if (mVerbose) {
+ System.out.println("unjarring " + destFile +
+ " from " + entry.getName());
+ }
+ FileOutputStream fos = new FileOutputStream(destFile);
+ dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+ while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+ dest.write(data, 0, count);
+ }
+ dest.flush();
+ dest.close();
+ if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());}
+ }
+ jis.close();
+ }
+
+ public void setVerbose(boolean b) {
+ mVerbose = b;
+ }
+
+ // ========================================================================
+ // Private methods
+
+ private static final char SEP = '/';
+ /**
+ * Recursively jars up the given path under the given directory.
+ */
+ private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
+ throws IOException {
+ if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
+ if (dirOrFile2jar.isDirectory()) {
+ String[] dirList = dirOrFile2jar.list();
+ String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP);
+ if (path != null) {
+ JarEntry je = new JarEntry(subPath);
+ je.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(je);
+ jos.flush();
+ jos.closeEntry();
+ }
+ for (int i = 0; i < dirList.length; i++) {
+ File f = new File(dirOrFile2jar, dirList[i]);
+ jarDir(f,jos,subPath);
+ }
+ } else {
+ if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
+ {
+ if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}
+ return;
+ }
+
+ if (mVerbose) {
+ System.out.println("adding " + dirOrFile2jar.getPath());
+ }
+ FileInputStream fis = new FileInputStream(dirOrFile2jar);
+ try {
+ JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
+ entry.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(entry);
+ while ((mByteCount = fis.read(mBuffer)) != -1) {
+ jos.write(mBuffer, 0, mByteCount);
+ if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");}
+ }
+ jos.flush();
+ jos.closeEntry();
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ fis.close();
+ }
+ }
+ }
+
+ // for debugging
+ public static void main(String[] args)
+ throws IOException
+ {
+ if (args.length < 2)
+ {
+ System.err.println("Usage: JarHelper jarname.jar directory");
+ return;
+ }
+
+ JarHelper jarHelper = new JarHelper();
+ jarHelper.mVerbose = true;
+
+ File destJar = new File(args[0]);
+ File dirOrFile2Jar = new File(args[1]);
+
+ jarHelper.jarDir(dirOrFile2Jar, destJar);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
new file mode 100644
index 0000000..79f9576
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -0,0 +1,70 @@
+
+package org.apache.flink.api.java;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+
+import org.apache.flink.api.scala.FlinkILoop;
+
+/**
+ * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
+ * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
+ * use the reference of the ILoop to write the compiled classes of the current session to
+ * a Jar file and submit these with the program.
+ */
+public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
+
+ // reference to Scala Shell, for access to virtual directory
+ private FlinkILoop flinkILoop;
+
+ /**
+ * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
+ */
+ public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
+ super(host, port, jarFiles);
+ this.flinkILoop = flinkILoop;
+ }
+
+ /**
+ * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
+ *
+ * @param jobName name of the job as string
+ * @return Result of the computation
+ * @throws Exception
+ */
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ Plan p = createProgramPlan(jobName);
+
+ String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
+
+ // call "traditional" execution methods
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
+
+ executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
+ return executor.executePlan(p);
+ }
+}