You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Xianda Ke (JIRA)" <ji...@apache.org> on 2016/11/23 07:52:58 UTC

[jira] [Commented] (PIG-5068) Set SPARK_REDUCERS by pig.properties not by system configuration

    [ https://issues.apache.org/jira/browse/PIG-5068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689248#comment-15689248 ] 

Xianda Ke commented on PIG-5068:
--------------------------------

[~kellyzly],
1. agree with you. it would be better to remove the conf from system environment.

2. from my point of view, it seem no need to add such a simple class which just store a single integer value. 

3. in fact, i have also write such a function for skewed join.  i paste it here for your information.
SkewedJoinConverter.java
{code}
private int getDefaultParallelism(List<RDD<Tuple>> predRDDs) {

        int parallelism = -1;

        SparkContext sc = predRDDs.get(0).context();
        if (parallelism < 0) {
            if (sc.conf().contains("spark.default.parallelism")) {
                parallelism = sc.defaultParallelism();
            } else {
                // find out max partitions number
                int maxPartitions = -1;
                for (int i = 0; i < predRDDs.size(); i++) {
                    if (predRDDs.get(i).partitions().length > maxPartitions) {
                        maxPartitions = predRDDs.get(i).partitions().length;
                    }
                }
                parallelism = maxPartitions;
            }
        }

        return parallelism;
    }
{code}
in this function, i have handled this case:  when sc.conf().contains("spark.default.parallelism") is false. 
I have a glance at SparkContext.scala and MesosSchedculerBackend.scala.  if sparkcontext.conf does not contain "spark.default.parallelism", the defalutParallelism depands on TaskScheduler. for instance, MesosSchedulerBackend will return 8. If we pick the partition number of preceding RDD, it would be better.  But I didn't parse PigContext's property "spark.reducers".

from my point of view, it would be better if we combine them.
here is my proposal: a util functionin in SparkUtil.java
{code}
    public static int getParallelism(List<RDD<Tuple>> predecessors,
            PhysicalOperator physicalOperator) {

        int parallelism = -1;
        String sparkReducers = pigContext.getProperties().getProperty("spark.reducers");
	if (sparkReducers != null) {
		return Integer.parseInt(sparkReducers);
	}

        int parallelism = physicalOperator.getRequestedParallelism();
        if (parallelism > 0) {
        	return parallelism;  
        }

        // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
        // is reasonable.
        SparkContext sc = predecessors.get(0).context();
        parallelism = sc.defaultParallelism();
        if (sc.conf().contains("spark.default.parallelism")) {
                parallelism = sc.defaultParallelism();
        } else {
            // find out max partitions number
            int maxPartitions = -1;
            for (int i = 0; i < predecessors.size(); i++) {
                if (predecessors.get(i).partitions().length > maxPartitions) {
                    maxPartitions = predecessors.get(i).partitions().length;
                }
            }
            parallelism = maxPartitions;
        }   

        return parallelism; 
    }
{code}



> Set SPARK_REDUCERS by pig.properties not by system configuration
> ----------------------------------------------------------------
>
>                 Key: PIG-5068
>                 URL: https://issues.apache.org/jira/browse/PIG-5068
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>         Attachments: PIG-5068.patch
>
>
> In SparkUtil.java, we set the SPARK_REDUCERS by system configuration
> {code}
>     public static int getParallelism(List<RDD<Tuple>> predecessors,
>             PhysicalOperator physicalOperator) {
>         String numReducers = System.getenv("SPARK_REDUCERS");
>         if (numReducers != null) {
>             return Integer.parseInt(numReducers);
>         }
>         int parallelism = physicalOperator.getRequestedParallelism();
>         if (parallelism <= 0) {
>             // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
>             // is reasonable.
>             parallelism = predecessors.get(0).context().defaultParallelism();
>         }
>         return parallelism;
>     }
> {code}
> It is better to set it by pig.properties



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)