You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/30 07:51:38 UTC
svn commit: r1546691 - in /pig/branches/tez: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/tez/ test/e2e/pig/tests/
Author: cheolsoo
Date: Sat Nov 30 06:51:38 2013
New Revision: 1546691
URL: http://svn.apache.org/r1546691
Log:
PIG-3599: Fix e2e Operator_1, 5, Checkin_3, and Join_1 (cheolsoo)
Modified:
pig/branches/tez/src/org/apache/pig/PigConfiguration.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/test/e2e/pig/tests/tez.conf
Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Sat Nov 30 06:51:38 2013
@@ -70,7 +70,12 @@ public class PigConfiguration {
public static final String SCHEMA_TUPLE_SHOULD_ALLOW_FORCE = "pig.schematuple.force";
- /*
+ /**
+ * This key is used to define whether to reuse AM in Tez jobs.
+ */
+ public static final String TEZ_SESSION_REUSE = "pig.tez.session.reuse";
+
+ /**
* Turns off use of combiners in MapReduce jobs produced by Pig.
*/
public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Sat Nov 30 06:51:38 2013
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.PigConfiguration;
import org.apache.tez.client.TezSession;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
@@ -44,6 +45,7 @@ public class TezJob extends ControlledJo
private DAGClient dagClient;
private Map<String, LocalResource> requestAMResources;
private TezSession tezSession;
+ private boolean reuseSession;
public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
throws IOException {
@@ -51,6 +53,7 @@ public class TezJob extends ControlledJo
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
+ this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
}
public DAG getDag() {
@@ -94,8 +97,17 @@ public class TezJob extends ControlledJo
}
setMessage(sb.toString());
TezSessionManager.freeSession(tezSession);
- tezSession = null;
- dagClient = null;
+ try {
+ if (!reuseSession) {
+ log.info("Shutting down Tez session");
+ tezSession.stop();
+ }
+ tezSession = null;
+ dagClient = null;
+ } catch (Exception e) {
+ log.info("Cannot stop Tez session", e);
+ setJobState(ControlledJob.State.FAILED);
+ }
break;
}
@@ -110,8 +122,12 @@ public class TezJob extends ControlledJo
@Override
public void killJob() throws IOException {
try {
- dagClient.tryKillDAG();
- tezSession.stop();
+ if (dagClient != null) {
+ dagClient.tryKillDAG();
+ }
+ if (tezSession != null) {
+ tezSession.stop();
+ }
} catch (TezException e) {
throw new IOException("Cannot kill DAG - Application Id: " + tezSession.getApplicationId(), e);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Sat Nov 30 06:51:38 2013
@@ -57,9 +57,8 @@ public class TezJobControlCompiler {
return tezDag;
}
- public TezJobControl compile(TezOperPlan tezPlan, String grpName,
- Configuration conf, TezPlanContainer planContainer)
- throws JobCreationException {
+ public TezJobControl compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+ throws JobCreationException {
int timeToSleep;
String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "1000";
String pigJobControlSleep = tezConf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
@@ -81,7 +80,7 @@ public class TezJobControlCompiler {
try {
// A single Tez job always pack only 1 Tez plan. We will track
// Tez job asynchronously to exploit parallel execution opportunities.
- TezJob job = getJob(tezPlan, conf, planContainer);
+ TezJob job = getJob(tezPlan, planContainer);
jobCtrl.addJob(job);
} catch (JobCreationException jce) {
throw jce;
@@ -94,7 +93,7 @@ public class TezJobControlCompiler {
return jobCtrl;
}
- private TezJob getJob(TezOperPlan tezPlan, Configuration conf, TezPlanContainer planContainer)
+ private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
throws JobCreationException {
try {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Sat Nov 30 06:51:38 2013
@@ -81,7 +81,7 @@ public class TezLauncher extends Launche
tezStats.initialize(tezPlan);
- jc = jcc.compile(tezPlan, grpName, conf, tezPlanContainer);
+ jc = jcc.compile(tezPlan, grpName, tezPlanContainer);
TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
((TezJobControl)jc).setJobNotifier(notifier);
((TezJobControl)jc).setTezStats(tezStats);
Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1546691&r1=1546690&r2=1546691&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Sat Nov 30 06:51:38 2013
@@ -36,22 +36,26 @@ $cfg = {
'tests' => [
{
'num' => 1,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
store a into ':OUTPATH:';\,
},
{
'num' => 2,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k';
store a into ':OUTPATH:';\,
},
{
'num' => 3,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = filter a by age>30;
c = group b by name;
d = foreach c generate group, COUNT(b) as count;
e = group d by count;
-store e into ':OUTPATH:';\,
+f = foreach e generate group, COUNT(d);
+store f into ':OUTPATH:';\,
},
]
},
@@ -60,7 +64,8 @@ store e into ':OUTPATH:';\,
'tests' => [
{
'num' => 1,
- 'pig' => q\register :PIGGYBANKPATH:/piggybank.jar
+ 'pig' => q\set pig.tez.session.reuse false;
+register :PIGGYBANKPATH:/piggybank.jar
a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = foreach a generate org.apache.pig.piggybank.evaluation.IsNumeric(name), org.apache.pig.piggybank.evaluation.IsNumeric(age);
store b into ':OUTPATH:';\,
@@ -68,34 +73,19 @@ store b into ':OUTPATH:';\,
]
},
{
- 'name' => 'LoaderStorer',
- 'tests' => [
- {
- 'num' => 1,
- 'pig' => q\register :PIGPATH:/build/ivy/lib/Pig/avro-mapred-1.7.4.jar
-a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
-store a into ':OUTPATH:.intermediate' using AvroStorage();
-exec
-b = load ':OUTPATH:.intermediate' using AvroStorage();
-c = filter b by age>30;
-store c into ':OUTPATH:';\,
- 'notmq' => 1,
- },
- ]
- },
- {
'name' => 'Operators',
'tests' => [
{
'num' => 1,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = limit a 100;
-c = filter b by age>30;
-store c into ':OUTPATH:';\,
+store b into ':OUTPATH:';\,
},
{
'num' => 2,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = foreach a generate age;
c = distinct b;
d = filter c by age > 30;
@@ -104,7 +94,8 @@ store d into ':OUTPATH:';\,
{
# Order by simple
'num' => 3,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = order a by age parallel 2;
store b into ':OUTPATH:';\,
'sortArgs' => ['-t', ' ', '-k', '2,2'],
@@ -112,7 +103,8 @@ store b into ':OUTPATH:';\,
{
# Order by simple no schema
'num' => 4,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k';
b = order a by $1;
store b into ':OUTPATH:';\,
'sortArgs' => ['-t', ' ', '-k', '2,2'],
@@ -120,13 +112,21 @@ store b into ':OUTPATH:';\,
{
# Order by after group
'num' => 5,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = group a by name;
-c = foreach b generate group, AVG(a.gpa) as avg_gpa;
+c = foreach b generate group, ROUND( AVG(a.gpa) ) as avg_gpa;
d = order c by avg_gpa parallel 2;
store d into ':OUTPATH:';\,
'sortArgs' => ['-t', ' ', '-k', '2,2'],
},
+ {
+ 'num' => 6,
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+b = sample a 1;
+store b into ':OUTPATH:';\,
+ },
]
},
{
@@ -134,7 +134,8 @@ store d into ':OUTPATH:';\,
'tests' => [
{
'num' => 1,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+ 'pig' => q\set pig.tez.session.reuse false;
+a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
c = filter a by age < 20;
d = filter b by age < 20;
@@ -142,6 +143,23 @@ e = join c by name, d by name;
store e into ':OUTPATH:';\,
}
]
+ },
+ {
+ 'name' => 'LoaderStorer',
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => q\set pig.tez.session.reuse true;
+register :PIGPATH:/build/ivy/lib/Pig/avro-mapred-1.7.4.jar
+a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+store a into ':OUTPATH:.intermediate' using AvroStorage();
+exec
+b = load ':OUTPATH:.intermediate' using AvroStorage();
+c = filter b by age>30;
+store c into ':OUTPATH:';\,
+ 'notmq' => 1,
+ },
+ ]
}
]
}