You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deepak Patankar <pa...@gmail.com> on 2023/04/20 21:43:46 UTC
Dependency injection for spark executors
I am writing a spark application which uses java and spring boot to process
rows. For every row it performs some logic and saves data into the database.
<https://stackoverflow.com/posts/76058897/timeline>
The logic is performed using some services defined in my application and
some external libraries.
While running my service I am getting NotSerializableException.
My spark job and service is :
// It is a temperorary job, which would be removed after testingpublic
class HelloWorld implements Runnable, Serializable {
@Autowired
GraphRequestProcessor graphProcessor;
@Override
public void run() {
String sparkAppName = "hello-job";
JavaSparkContext sparkCtx = new
JavaSparkContext(getSparkConf(sparkAppName));
JavaRDD<Integer> rdd = sparkCtx.parallelize(Arrays.asList(1, 2, 3, 4));
rdd.map(new Function<Integer, Object>() {
@Override
public Object call(Integer integer) throws Exception {
System.out.println(integer);
return integer;
}
});
System.out.println("Done");
}
public static SparkConf getSparkConf(String appName) {
Map<String, String> env = System.getenv();
String master = env.get("SPARK_MASTER");
SparkConf sparkConfig = new
SparkConf().setAppName(appName).set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"com.example.sparktest.AppKryoRegistrar");
if (master != null) {
sparkConfig.setMaster(master);
} else {
sparkConfig.setMaster("local[*]");
}
return sparkConfig;
}
}
The graphRequestProcessor is actually used in the task. I am skipping
calling it here, to keep the example simple. I have registered the
GraphRequestProcessor and GraphRequestProcessorImpl in the kryo registrar.
Since I am using kryo, I think the serialization would be handled by kryo
itself.
After running my spark application, I am getting the error
Serialization stack:
- object not serializable (class:
com.example.app.dolos.impl.GraphRequestProcessorImpl$$EnhancerBySpringCGLIB$$6fc280f7,
value: com.example.app.dolos.impl.GraphRequestProcessorImpl@4a225014)
- field (class: com.example.app.dolos.job.HelloWorld, name:
graphRequestProcessor, type: interface
com.booking.app.dolos.service.GraphRequestProcessor)
- object (class com.example.app.dolos.job.HelloWorld,
com.example.app.dolos.job.HelloWorld@65448932)
- field (class: com.example.app.dolos.job.HelloWorld$1, name:
this$0, type: class com.example.app.dolos.job.HelloWorld)
- object (class com.example.app.dolos.job.HelloWorld$1,
com.booking.app.dolos.job.HelloWorld$1@1a2b23f2)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name:
capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class
org.apache.spark.api.java.JavaPairRDD$,
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
org/apache/spark/api/java/JavaPairRDD$.$anonfun$toScalaFunction$1:(Lorg/apache/spark/api/java/function/Function;Ljava/lang/Object;)Ljava/lang/Object;,
instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;,
numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class
org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x0000000800d1e840,
org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x0000000800d1e840@1666e021)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 13 more
I read the answer
<https://stackoverflow.com/questions/40259196/understanding-sparks-closures-and-their-serialization>
here
and understood that Java might not be able to serialise my class. But how
do I solve the above issue without making all my classes extend Serializable ?
How do we manage serialisation for big java projects ? Is there a way to
use Dependency injection frameworks at the spark executors ?