You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2016/10/26 07:59:58 UTC

[jira] [Updated] (PIG-5051) Initialize PigContants.TASK_INDEX in spark mode correctly

     [ https://issues.apache.org/jira/browse/PIG-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liyunzhang_intel updated PIG-5051:
----------------------------------
    Description: 
in MR, we initialize PigContants.TASK_INDEX in  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup 
{code}
protected void setup(Context context) throws IOException, InterruptedException {
   ...
    context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
...
}
{code}
But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX when job starts. We need find a solution to initialize PigContants.TASK_INDEX correctly.

After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark mode will be same with what in mr.
Now we divide two cases in  TestBuiltin#testUniqueID
{code}

 @Test
    public void testUniqueID() throws Exception {
     ...
        if (!Util.isSparkExecType(cluster.getExecType())) {
            assertEquals("0-0", iter.next().get(1));
            assertEquals("0-1", iter.next().get(1));
            assertEquals("0-2", iter.next().get(1));
            assertEquals("0-3", iter.next().get(1));
            assertEquals("0-4", iter.next().get(1));
            assertEquals("1-0", iter.next().get(1));
            assertEquals("1-1", iter.next().get(1));
            assertEquals("1-2", iter.next().get(1));
            assertEquals("1-3", iter.next().get(1));
            assertEquals("1-4", iter.next().get(1));
        } else {
            // because we set PigConstants.TASK_INDEX as 0 in
            // ForEachConverter#ForEachFunction#initializeJobConf
            // UniqueID.exec() will output like 0-*
            // the behavior of spark will be same with mr until PIG-5051 is fixed.
            assertEquals(iter.next().get(1), "0-0");
            assertEquals(iter.next().get(1), "0-1");
            assertEquals(iter.next().get(1), "0-2");
            assertEquals(iter.next().get(1), "0-3");
            assertEquals(iter.next().get(1), "0-4");
            assertEquals(iter.next().get(1), "0-0");
            assertEquals(iter.next().get(1), "0-1");
            assertEquals(iter.next().get(1), "0-2");
            assertEquals(iter.next().get(1), "0-3");
            assertEquals(iter.next().get(1), "0-4");
        }
   ...
}
{code}

  was:
in MR, we initialize PigContants.TASK_INDEX in  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup 
{code}
protected void setup(Context context) throws IOException, InterruptedException {
   ...
    context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
...
}
{code}
But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX when job starts. We need find a solution to initialize PigContants.TASK_INDEX correctly.


> Initialize PigContants.TASK_INDEX in spark mode correctly
> ---------------------------------------------------------
>
>                 Key: PIG-5051
>                 URL: https://issues.apache.org/jira/browse/PIG-5051
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>             Fix For: spark-branch
>
>
> in MR, we initialize PigContants.TASK_INDEX in  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup 
> {code}
> protected void setup(Context context) throws IOException, InterruptedException {
>    ...
>     context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
> ...
> }
> {code}
> But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX when job starts. We need find a solution to initialize PigContants.TASK_INDEX correctly.
> After this jira is fixed.  The behavior of TestBuiltin#testUniqueID in spark mode will be same with what in mr.
> Now we divide two cases in  TestBuiltin#testUniqueID
> {code}
>  @Test
>     public void testUniqueID() throws Exception {
>      ...
>         if (!Util.isSparkExecType(cluster.getExecType())) {
>             assertEquals("0-0", iter.next().get(1));
>             assertEquals("0-1", iter.next().get(1));
>             assertEquals("0-2", iter.next().get(1));
>             assertEquals("0-3", iter.next().get(1));
>             assertEquals("0-4", iter.next().get(1));
>             assertEquals("1-0", iter.next().get(1));
>             assertEquals("1-1", iter.next().get(1));
>             assertEquals("1-2", iter.next().get(1));
>             assertEquals("1-3", iter.next().get(1));
>             assertEquals("1-4", iter.next().get(1));
>         } else {
>             // because we set PigConstants.TASK_INDEX as 0 in
>             // ForEachConverter#ForEachFunction#initializeJobConf
>             // UniqueID.exec() will output like 0-*
>             // the behavior of spark will be same with mr until PIG-5051 is fixed.
>             assertEquals(iter.next().get(1), "0-0");
>             assertEquals(iter.next().get(1), "0-1");
>             assertEquals(iter.next().get(1), "0-2");
>             assertEquals(iter.next().get(1), "0-3");
>             assertEquals(iter.next().get(1), "0-4");
>             assertEquals(iter.next().get(1), "0-0");
>             assertEquals(iter.next().get(1), "0-1");
>             assertEquals(iter.next().get(1), "0-2");
>             assertEquals(iter.next().get(1), "0-3");
>             assertEquals(iter.next().get(1), "0-4");
>         }
>    ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)