You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "YuNing Liu (Jira)" <ji...@apache.org> on 2022/05/25 13:17:00 UTC

[jira] [Created] (SPARK-39290) Question of job division in "df.groupBy().applyInPandas"

YuNing Liu created SPARK-39290:
----------------------------------

             Summary: Question of job division in "df.groupBy().applyInPandas"
                 Key: SPARK-39290
                 URL: https://issues.apache.org/jira/browse/SPARK-39290
             Project: Spark
          Issue Type: Question
          Components: PySpark
    Affects Versions: 3.2.1
         Environment: python 3.8

pyspark 3.2.1

pyarrow 7.0.0

hadoop 3.3.2
            Reporter: YuNing Liu


My program runs in spark on Yarn environment and has four nodes in total. My num executors is set to 4, and so is the executor cores. When I use "df.groupBy().applyInPandas" to process my dataframe, the program is always divided into two jobs. The first job contains only one task, and the second job contains three tasks. And the two jobs have the same DAG diagram and perform exactly the same operations, but the data is different. As a result, the execution time of my program is about doubled. It should be that only one job contains four tasks. This problem has bothered me for a long time and I can't find the reason. This code is a simple example I use to test. My dataframe stores the information of the image data saved by HDFS, including three columns: "Id", "path" and "category". "Id" indicates the node on which the image is located, "path" indicates the specific path, "category" indicates the image category.
{code:java}
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyhdfs import HdfsClient

spark = SparkSession.builder.appName("test"). \
    config("spark.sql.shuffle.partitions", "40"). \
    config("spark.default.parallelism", "40"). \
    config("spark.sql.execution.arrow.pyspark.enabled", "true"). \
    getOrCreate()


def process(key, paths):
    a = len(paths.path)
    for i in range(100000000):
        a+=1
    a = str(a)
    return pd.DataFrame([key+(a,)])


if __name__ == '__main__':
    sc = spark.sparkContext
    client = HdfsClient(hosts="master:9870", user_name="hadoop")
    dataset_dir = "/Datasets"
    files_pd = pd.DataFrame()
    for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)):
        child_path = os.path.join(dataset_dir, per_dataset_dir)
        files = pd.DataFrame([
            [slave, os.path.join(str(child_path), str(child_dir_name), str(filename)), index]
            for index, child_dir_name in enumerate(client.listdir(child_path))
            for filename in client.listdir(os.path.join(child_path, child_dir_name))])
        files_pd = pd.concat([files_pd, files])
    files_pd = files_pd.sample(frac=1).reset_index(drop=True)
    spark_files = spark.createDataFrame(files_pd, ("id", "path", "category"))
    result = spark_files.groupby("id").applyInPandas(process, schema="id long, path string")
    result.show()

 {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org