You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deepak Patankar <pa...@gmail.com> on 2023/04/20 21:43:46 UTC

Dependency injection for spark executors

I am writing a spark application which uses java and spring boot to process
rows. For every row it performs some logic and saves data into the database.
<https://stackoverflow.com/posts/76058897/timeline>

The logic is performed using some services defined in my application and
some external libraries.

While running my service I am getting NotSerializableException.

My spark job and service is :

// It is a temperorary job, which would be removed after testingpublic
class HelloWorld implements Runnable, Serializable {

    @Autowired
    GraphRequestProcessor graphProcessor;

    @Override
    public void run() {
        String sparkAppName = "hello-job";
        JavaSparkContext sparkCtx = new
JavaSparkContext(getSparkConf(sparkAppName));
        JavaRDD<Integer> rdd = sparkCtx.parallelize(Arrays.asList(1, 2, 3, 4));
        rdd.map(new Function<Integer, Object>() {
            @Override
            public Object call(Integer integer) throws Exception {
                System.out.println(integer);
                return integer;
            }
        });
        System.out.println("Done");
    }

    public static SparkConf getSparkConf(String appName) {
        Map<String, String> env = System.getenv();
        String master = env.get("SPARK_MASTER");
        SparkConf sparkConfig = new
SparkConf().setAppName(appName).set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
            .set("spark.kryo.registrator",
"com.example.sparktest.AppKryoRegistrar");
        if (master != null) {
            sparkConfig.setMaster(master);
        } else {
            sparkConfig.setMaster("local[*]");
        }
        return sparkConfig;
    }

}

The graphRequestProcessor is actually used in the task. I am skipping
calling it here, to keep the example simple. I have registered the
GraphRequestProcessor and GraphRequestProcessorImpl in the kryo registrar.
Since I am using kryo, I think the serialization would be handled by kryo
itself.

After running my spark application, I am getting the error

Serialization stack:
    - object not serializable (class:
com.example.app.dolos.impl.GraphRequestProcessorImpl$$EnhancerBySpringCGLIB$$6fc280f7,
value: com.example.app.dolos.impl.GraphRequestProcessorImpl@4a225014)
    - field (class: com.example.app.dolos.job.HelloWorld, name:
graphRequestProcessor, type: interface
com.booking.app.dolos.service.GraphRequestProcessor)
    - object (class com.example.app.dolos.job.HelloWorld,
com.example.app.dolos.job.HelloWorld@65448932)
    - field (class: com.example.app.dolos.job.HelloWorld$1, name:
this$0, type: class com.example.app.dolos.job.HelloWorld)
    - object (class com.example.app.dolos.job.HelloWorld$1,
com.booking.app.dolos.job.HelloWorld$1@1a2b23f2)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name:
capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class
org.apache.spark.api.java.JavaPairRDD$,
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
org/apache/spark/api/java/JavaPairRDD$.$anonfun$toScalaFunction$1:(Lorg/apache/spark/api/java/function/Function;Ljava/lang/Object;)Ljava/lang/Object;,
instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;,
numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class
org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x0000000800d1e840,
org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x0000000800d1e840@1666e021)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 13 more

I read the answer
<https://stackoverflow.com/questions/40259196/understanding-sparks-closures-and-their-serialization>
here
and understood that Java might not be able to serialise my class. But how
do I solve the above issue without making all my classes extend Serializable ?
How do we manage serialisation for big java projects ? Is there a way to
use Dependency injection frameworks at the spark executors ?