You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/30 07:51:38 UTC

svn commit: r1546691 - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/tez/ test/e2e/pig/tests/

Author: cheolsoo
Date: Sat Nov 30 06:51:38 2013
New Revision: 1546691

URL: http://svn.apache.org/r1546691
Log:
PIG-3599: Fix e2e Operator_1, 5, Checkin_3, and Join_1 (cheolsoo)

Modified:
    pig/branches/tez/src/org/apache/pig/PigConfiguration.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/test/e2e/pig/tests/tez.conf

Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Sat Nov 30 06:51:38 2013
@@ -70,7 +70,12 @@ public class PigConfiguration {
 
     public static final String SCHEMA_TUPLE_SHOULD_ALLOW_FORCE = "pig.schematuple.force";
 
-    /*
+    /**
+     * This key is used to define whether to reuse AM in Tez jobs.
+     */
+    public static final String TEZ_SESSION_REUSE = "pig.tez.session.reuse";
+
+    /**
      * Turns off use of combiners in MapReduce jobs produced by Pig.
      */
     public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Sat Nov 30 06:51:38 2013
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.PigConfiguration;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -44,6 +45,7 @@ public class TezJob extends ControlledJo
     private DAGClient dagClient;
     private Map<String, LocalResource> requestAMResources;
     private TezSession tezSession;
+    private boolean reuseSession;
 
     public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
             throws IOException {
@@ -51,6 +53,7 @@ public class TezJob extends ControlledJo
         this.conf = conf;
         this.dag = dag;
         this.requestAMResources = requestAMResources;
+        this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
     }
 
     public DAG getDag() {
@@ -94,8 +97,17 @@ public class TezJob extends ControlledJo
                 }
                 setMessage(sb.toString());
                 TezSessionManager.freeSession(tezSession);
-                tezSession = null;
-                dagClient = null;
+                try {
+                    if (!reuseSession) {
+                        log.info("Shutting down Tez session");
+                        tezSession.stop();
+                    }
+                    tezSession = null;
+                    dagClient = null;
+                } catch (Exception e) {
+                    log.info("Cannot stop Tez session", e);
+                    setJobState(ControlledJob.State.FAILED);
+                }
                 break;
             }
 
@@ -110,8 +122,12 @@ public class TezJob extends ControlledJo
     @Override
     public void killJob() throws IOException {
         try {
-            dagClient.tryKillDAG();
-            tezSession.stop();
+            if (dagClient != null) {
+                dagClient.tryKillDAG();
+            }
+            if (tezSession != null) {
+                tezSession.stop();
+            }
         } catch (TezException e) {
             throw new IOException("Cannot kill DAG - Application Id: " + tezSession.getApplicationId(), e);
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Sat Nov 30 06:51:38 2013
@@ -57,9 +57,8 @@ public class TezJobControlCompiler {
         return tezDag;
     }
 
-    public TezJobControl compile(TezOperPlan tezPlan, String grpName,
-            Configuration conf, TezPlanContainer planContainer)
-                    throws JobCreationException {
+    public TezJobControl compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+            throws JobCreationException {
         int timeToSleep;
         String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "1000";
         String pigJobControlSleep = tezConf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
@@ -81,7 +80,7 @@ public class TezJobControlCompiler {
         try {
             // A single Tez job always pack only 1 Tez plan. We will track 
             // Tez job asynchronously to exploit parallel execution opportunities.
-            TezJob job = getJob(tezPlan, conf, planContainer);
+            TezJob job = getJob(tezPlan, planContainer);
             jobCtrl.addJob(job);
         } catch (JobCreationException jce) {
             throw jce;
@@ -94,7 +93,7 @@ public class TezJobControlCompiler {
         return jobCtrl;
     }
 
-    private TezJob getJob(TezOperPlan tezPlan, Configuration conf, TezPlanContainer planContainer)
+    private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
             throws JobCreationException {
         try {
             Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Sat Nov 30 06:51:38 2013
@@ -81,7 +81,7 @@ public class TezLauncher extends Launche
 
             tezStats.initialize(tezPlan);
 
-            jc = jcc.compile(tezPlan, grpName, conf, tezPlanContainer);
+            jc = jcc.compile(tezPlan, grpName, tezPlanContainer);
             TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
             ((TezJobControl)jc).setJobNotifier(notifier);
             ((TezJobControl)jc).setTezStats(tezStats);

Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Sat Nov 30 06:51:38 2013
@@ -36,22 +36,26 @@ $cfg = {
                 'tests' => [
                         {
                         'num' => 1,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 store a into ':OUTPATH:';\,
                         },
                         {
                         'num' => 2,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k';
 store a into ':OUTPATH:';\,
                         },
                         {
                         'num' => 3,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = filter a by age>30;
 c = group b by name;
 d = foreach c generate group, COUNT(b) as count;
 e = group d by count;
-store e into ':OUTPATH:';\,
+f = foreach e generate group, COUNT(d);
+store f into ':OUTPATH:';\,
                         },
                   ]
                 },
@@ -60,7 +64,8 @@ store e into ':OUTPATH:';\,
                 'tests' => [
                         {
                         'num' => 1,
-                        'pig' => q\register :PIGGYBANKPATH:/piggybank.jar
+                        'pig' => q\set pig.tez.session.reuse false;
+register :PIGGYBANKPATH:/piggybank.jar
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = foreach a generate org.apache.pig.piggybank.evaluation.IsNumeric(name), org.apache.pig.piggybank.evaluation.IsNumeric(age);
 store b into ':OUTPATH:';\,
@@ -68,34 +73,19 @@ store b into ':OUTPATH:';\,
                   ]
                 },
                 {
-                'name' => 'LoaderStorer',
-                'tests' => [
-                        {
-                        'num' => 1,
-                        'pig' => q\register :PIGPATH:/build/ivy/lib/Pig/avro-mapred-1.7.4.jar
-a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
-store a into ':OUTPATH:.intermediate' using AvroStorage();
-exec
-b = load ':OUTPATH:.intermediate' using AvroStorage();
-c = filter b by age>30;
-store c into ':OUTPATH:';\,
-                        'notmq' => 1,
-                        },
-                  ]
-                },
-                {
                 'name' => 'Operators',
                 'tests' => [
                         {
                         'num' => 1,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = limit a 100;
-c = filter b by age>30;
-store c into ':OUTPATH:';\,
+store b into ':OUTPATH:';\,
                         },
                         {
                         'num' => 2,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = foreach a generate age;
 c = distinct b;
 d = filter c by age > 30;
@@ -104,7 +94,8 @@ store d into ':OUTPATH:';\,
                         {
                         # Order by simple
                         'num' => 3,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = order a by age parallel 2;
 store b into ':OUTPATH:';\,
                         'sortArgs' => ['-t', '	', '-k', '2,2'],
@@ -112,7 +103,8 @@ store b into ':OUTPATH:';\,
                         {
                         # Order by simple no schema
                         'num' => 4,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k';
 b = order a by $1;
 store b into ':OUTPATH:';\,
                         'sortArgs' => ['-t', '	', '-k', '2,2'],
@@ -120,13 +112,21 @@ store b into ':OUTPATH:';\,
                         {
                         # Order by after group
                         'num' => 5,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = group a by name;
-c = foreach b generate group, AVG(a.gpa) as avg_gpa;
+c = foreach b generate group, ROUND( AVG(a.gpa) ) as avg_gpa;
 d = order c by avg_gpa parallel 2;
 store d into ':OUTPATH:';\,
                         'sortArgs' => ['-t', '	', '-k', '2,2'],
                         },
+                        {
+                        'num' => 6,
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+b = sample a 1;
+store b into ':OUTPATH:';\,
+                        },
                   ]
                 },
                 {
@@ -134,7 +134,8 @@ store d into ':OUTPATH:';\,
                 'tests' => [
                         {
                         'num' => 1,
-                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+                        'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
 c = filter a by age < 20;
 d = filter b by age < 20;
@@ -142,6 +143,23 @@ e = join c by name, d by name;
 store e into ':OUTPATH:';\,
                         }
                   ]
+                },
+                {
+                'name' => 'LoaderStorer',
+                'tests' => [
+                        {
+                        'num' => 1,
+                        'pig' => q\set pig.tez.session.reuse true;
+register :PIGPATH:/build/ivy/lib/Pig/avro-mapred-1.7.4.jar
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+store a into ':OUTPATH:.intermediate' using AvroStorage();
+exec
+b = load ':OUTPATH:.intermediate' using AvroStorage();
+c = filter b by age>30;
+store c into ':OUTPATH:';\,
+                        'notmq' => 1,
+                        },
+                  ]
                 }
          ]
        }