You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Catalin Alexandru Zamfir (JIRA)" <ji...@apache.org> on 2016/06/01 05:37:13 UTC
[jira] [Comment Edited] (SPARK-15582) Support for Groovy closures
[ https://issues.apache.org/jira/browse/SPARK-15582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308036#comment-15308036 ]
Catalin Alexandru Zamfir edited comment on SPARK-15582 at 6/1/16 5:36 AM:
--------------------------------------------------------------------------
So here's the work-around, wished there was a way not to do it this way. In short, we are:
- taking the script by text and using CompilationUnit (from Groovy) to compile the script to bytecode. This must be done on the driver node before evaluating the script;
- for each class from the above CU, we append it to the JAR on a distributed FS. We give each JAR an unique name, later passed on to addJar. This is just for isolation purposes so that each Script gets its own JAR;
- the resulting JAR is added to the Spark context using addJar;
- all closures must by dehydrated () which comes with its limitations and also nested closures don't seem to work;
- however, if you design your code around the problem of Groovy closures, using interfaces/abstract classes and you extend or implement those classes in Groovy code it seems there is no limitation on what you can do (eg. an AbstractFilter class/interface);
{noformat}
// Compile it
Long dateOfNow = DateTime.now ().getMillis ();
String nameOfScript = String.format ("ScriptOf%d", dateOfNow);
String pathOfJar = String.format ("distributed/fs/path/to/your/groovy-scripts/%s.jar", String.format ("JarOf%d", dateOfNow));
File archiveFile = new File (pathOfJar);
Files.createParentDirs (archiveFile);
// With resources
List<?> compilationList = compileGroovyScript (nameOfScript, sourceCode);
try (JarArchiveOutputStream oneJar = new JarArchiveOutputStream (new FileOutputStream (new File (pathOfJar)))) {
// For
for (Object compileClass : compilationList) {
// Append
GroovyClass groovyClass = (GroovyClass) compileClass;
JarArchiveEntry oneJarEntry = new JarArchiveEntry (String.format ("%s.class", groovyClass.getName ()));
oneJarEntry.setSize (groovyClass.getBytes ().length);
byte[] bytecodeOfClass = groovyClass.getBytes ();
oneJar.putArchiveEntry (oneJarEntry);
oneJar.write (bytecodeOfClass);
oneJar.closeArchiveEntry ();
}
// End it up
oneJar.finish ();
oneJar.close ();
} catch (Exception e) {
// Do something
}
// Append the JAR to the execution environment
sparkService.getSparkContext ().addJar (pathOfJar);
// GroovyShell.evaluate (scriptText, nameOfScript) below;
{noformat}
Any idea on how this can be improved? (eg. not using the addJar method and the requirement to not dehydrate the Groovy closures)
was (Author: antauri):
So here's the work-around, wished there was a way not to do it this way. In short, we are:
- taking the script by text and using CompilationUnit (from Groovy) to compile the script to bytecode;
- for each class we append it to the JAR;
- the resulting JAR is added to the Spark context using addJar;
- all closures must by dehydrated () which comes with its limitations;
{noformat}
// Compile it
Long dateOfNow = DateTime.now ().getMillis ();
String nameOfScript = String.format ("ScriptOf%d", dateOfNow);
String pathOfJar = String.format ("distributed/fs/path/to/your/groovy-scripts/%s.jar", String.format ("JarOf%d", dateOfNow));
File archiveFile = new File (pathOfJar);
Files.createParentDirs (archiveFile);
// With resources
List<?> compilationList = compileGroovyScript (nameOfScript, sourceCode);
try (JarArchiveOutputStream oneJar = new JarArchiveOutputStream (new FileOutputStream (new File (pathOfJar)))) {
// For
for (Object compileClass : compilationList) {
// Append
GroovyClass groovyClass = (GroovyClass) compileClass;
JarArchiveEntry oneJarEntry = new JarArchiveEntry (String.format ("%s.class", groovyClass.getName ()));
oneJarEntry.setSize (groovyClass.getBytes ().length);
byte[] bytecodeOfClass = groovyClass.getBytes ();
oneJar.putArchiveEntry (oneJarEntry);
oneJar.write (bytecodeOfClass);
oneJar.closeArchiveEntry ();
}
// End it up
oneJar.finish ();
oneJar.close ();
} catch (Exception e) {
// Do something
}
// Append the JAR to the execution environment
sparkService.getSparkContext ().addJar (pathOfJar);
// GroovyShell.evaluate (scriptText, nameOfScript) below;
{noformat}
Any idea on how this can be improved? (eg. not using the addJar method and the requirement to not dehydrate the Groovy closures?)
> Support for Groovy closures
> ---------------------------
>
> Key: SPARK-15582
> URL: https://issues.apache.org/jira/browse/SPARK-15582
> Project: Spark
> Issue Type: Improvement
> Components: Input/Output, Java API
> Affects Versions: 1.6.1, 1.6.2, 2.0.0
> Environment: 6 node Debian 8 based Spark cluster
> Reporter: Catalin Alexandru Zamfir
>
> After fixing SPARK-13599 and running one of our jobs against this fix for Groovy dependencies (which indeed it fixed), we see the Spark executors stuck at a ClassNotFound exception when running as a Script (via GroovyShell.evalute (scriptText)). It seems Spark cannot de-serialize the closure, or the closure is not received by the executor.
> {noformat}
> sparkContext.binaryFiles (ourPath).flatMap ({ onePathEntry -> code-block } as FlatMapFunction).count ();
> { onePathEntry -> code-block } denotes a Groovy closure.
> {noformat}
> There is a groovy-spark example @ https://github.com/bunions1/groovy-spark-example ... However the above uses a modified Groovy. If my understanding is correct, Groovy compiles to native byte-code, which should be easy for Spark to pick-up and use closures.
> The above example code fails with this stack-trace:
> {noformat}
> Caused by: java.lang.ClassNotFoundException: Script1$_run_closure1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
> at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214
> {noformat}
> Any ideas on how to tackle this, welcomed. I've tried Googling around for similar issues, but nobody has found a solution.
> At least, point me on where to "hack" to make Spark support closures and I'd share some of my time to make it work. There is SPARK-2171 arguing that support for this is out of the box, but for projects of a relative complex size where the driver application is contained/part-of a bigger application and running on a cluster, things do not seem to work. I don't know if SPARK-2171 has tried to run outside of a local[] cluster set-up where such issues can arise.
> I saw a couple of people trying to make it to work, but again, they look to work-arounds (eg. distribution of byte-code manually before needed, adding a JAR with addJar and other work-arounds).
> Can this be done? Where can we look?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org