You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sergejs Andrejevs <S....@intrum.com> on 2018/09/14 11:33:08 UTC

Spark2 DynamicAllocation doesn't release executors that used cache

Hi,

We're starting to use Spark2 with usecases for Dynamic Allocation.
However, it was noticed it doesn't work as expected when dataset is cached&uncached (persist&unpersist).
The cluster runs with:
CDH 5.15.0
Spark 2.3.0
Oracle Java 8.131

The following configs are passed to spark (as well as setup at cluster):
# Dynamic Allocation
spark.shuffle.service.enabled                                      true
spark.dynamicAllocation.enabled                                    true

spark.dynamicAllocation.schedulerBacklogTimeout                    1
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout           1
spark.dynamicAllocation.executorIdleTimeout                        90

spark.dynamicAllocation.initialExecutors                           1
spark.dynamicAllocation.minExecutors                               1
spark.dynamicAllocation.maxExecutors                               30

Cluster also has these configs enabled, as well as spark_shuffle is setup and YARN application classpath is populated. The executors' storage is freed upon application finish (based on: https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service)
Here is the simplified code that reproduced the issue in our cluster (HA YARN).

When the following code is executed with "cache=false" - the executors are created, used and killed by idle timeout.
When "cache=true" - the executors are created, used, but not killed and they remain hanging.

The storage in both cases was cleaned up.

void run() {

    List<O1> objList = new ArrayList<>();

    for (long i = 0; i < 1000; i++) {

        objList.add(new O1(i, "test"));

    }



    Dataset<O1> ds = sparkSession.createDataset(objList, Encoders.bean(O1.class));

    ds = ds.repartition(4);



    if (cache) {

        ds.persist(StorageLevel.MEMORY_AND_DISK());

        try {

            ds.show(100, false);

        } finally {

            ds.unpersist();

        }

    } else {

        ds.show(100, false);

    }

}



//O1 POJO class:

public class O1 {

    private Long transactionDate;

    private String name;



    public O1() {

    }

    public O1(Long transactionDate, String name) {

        this.transactionDate = transactionDate;

        this.name = name;

    }



    public Long getTransactionDate() {

        return transactionDate;

    }

    public void setTransactionDate(Long transactionDate) {

        this.transactionDate = transactionDate;

    }

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

}

Moreover, when spark.dynamicAllocation.cachedExecutorIdleTimeout is set to some particular time, then the containers are killed successfully (even if they have used cache) (the check was inspired by: https://spark.apache.org/docs/latest/job-scheduling.html#graceful-decommission-of-executors )

Unfortunately, we will have in future containers that keep cache and might live for a long time, as well as containers that free the cache (unpersist) and are expected to be killed (along with idling executors).

Is it a bug or some configuration is missing?

Best regards,
Sergejs Andrejevs

Re: Spark2 DynamicAllocation doesn't release executors that used cache

Posted by Sergejs Andrejevs <S....@intrum.com>.
Has anybody tried dynamic allocation with executors, which use cache?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org