You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/16 21:36:40 UTC
svn commit: r910677 - in /hadoop/pig/branches/branch-0.6: ./
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/tools/grunt/
Author: pradeepkth
Date: Tue Feb 16 20:36:40 2010
New Revision: 910677
URL: http://svn.apache.org/viewvc?rev=910677&view=rev
Log:
PIG-1239: PigContext.connect() should not create a jobClient and jobClient should be created on demand when needed (pradeepkth)
Modified:
hadoop/pig/branches/branch-0.6/CHANGES.txt
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Tue Feb 16 20:36:40 2010
@@ -151,6 +151,9 @@
BUG FIXES
+PIG-1239: PigContext.connect() should not create a jobClient and jobClient
+should be created on demand when needed (pradeepkth)
+
PIG-1213: Schema serialization is broken (pradeepkth)
PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER,
Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Feb 16 20:36:40 2010
@@ -90,7 +90,8 @@
protected DataStorage ds;
- protected JobClient jobClient;
+ @SuppressWarnings("deprecation")
+ protected JobConf jobConf;
// key: the operator key from the logical plan that originated the physical plan
// val: the operator key for the root of the phyisical plan
@@ -110,11 +111,12 @@
this.ds = null;
// to be set in the init method
- this.jobClient = null;
+ this.jobConf = null;
}
- public JobClient getJobClient() {
- return this.jobClient;
+ @SuppressWarnings("deprecation")
+ public JobConf getJobConf() {
+ return this.jobConf;
}
public Map<OperatorKey, MapRedResult> getMaterializedResults() {
@@ -134,6 +136,7 @@
init(this.pigContext.getProperties());
}
+ @SuppressWarnings("deprecation")
public void init(Properties properties) throws ExecException {
//First set the ssh socket factory
setSSHFactory();
@@ -155,13 +158,13 @@
// Now add the settings from "properties" object to override any existing properties
// All of the above is accomplished in the method call below
- JobConf jobConf = new JobConf();
- jobConf.addResource("pig-cluster-hadoop-site.xml");
+ JobConf jc = new JobConf();
+ jc.addResource("pig-cluster-hadoop-site.xml");
//the method below alters the properties object by overriding the
//hadoop properties with the values from properties and recomputing
//the properties
- recomputeProperties(jobConf, properties);
+ recomputeProperties(jc, properties);
configuration = ConfigurationUtil.toConfiguration(properties);
properties = ConfigurationUtil.toProperties(configuration);
@@ -193,15 +196,8 @@
log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
}
- try {
- // Set job-specific configuration knobs
- jobClient = new JobClient(new JobConf(configuration));
- }
- catch (IOException e) {
- int errCode = 6009;
- String msg = "Failed to create job client:" + e.getMessage();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
+ // Set job-specific configuration knobs
+ jobConf = new JobConf(configuration);
}
public Properties getConfiguration() throws ExecException {
Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Feb 16 20:36:40 2010
@@ -1878,7 +1878,8 @@
* @throws PlanException
* @throws VisitorException
*/
- protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
+ @SuppressWarnings("deprecation")
+ protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
@@ -2053,7 +2054,7 @@
if(val<=0)
val = pigContext.defaultParallel;
if (val<=0)
- val = ((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks();
+ val = ((JobConf)((HExecutionEngine)eng).getJobConf()).getNumReduceTasks();
if (val<=0)
val = 1;
} catch (Exception e) {
Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Feb 16 20:36:40 2010
@@ -109,7 +109,7 @@
ExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
- JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
+ JobClient jobClient = new JobClient(((HExecutionEngine)exe).getJobConf());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java Tue Feb 16 20:36:40 2010
@@ -49,6 +49,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.JobID;
import org.apache.pig.FuncSpec;
@@ -71,6 +72,7 @@
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
import org.apache.pig.impl.util.LogUtils;
+@SuppressWarnings("deprecation")
public class GruntParser extends PigScriptParser {
private final Log log = LogFactory.getLog(getClass());
@@ -203,13 +205,14 @@
//
ExecutionEngine execEngine = mPigServer.getPigContext().getExecutionEngine();
if (execEngine instanceof HExecutionEngine) {
- mJobClient = ((HExecutionEngine)execEngine).getJobClient();
+ mJobConf = ((HExecutionEngine)execEngine).getJobConf();
}
else {
- mJobClient = null;
+ mJobConf = null;
}
}
+ @Override
public void prompt()
{
if (mInteractive) {
@@ -217,6 +220,7 @@
}
}
+ @Override
protected void quit()
{
mDone = true;
@@ -226,6 +230,7 @@
return mDone;
}
+ @Override
protected void processDescribe(String alias) throws IOException {
if(alias==null) {
alias = mPigServer.getPigContext().getLastAlias();
@@ -233,6 +238,7 @@
mPigServer.dumpSchema(alias);
}
+ @Override
protected void processExplain(String alias, String script, boolean isVerbose,
String format, String target,
List<String> params, List<String> files)
@@ -317,10 +323,12 @@
}
}
+ @Override
protected void printAliases() throws IOException {
mPigServer.printAliases();
}
+ @Override
protected void processRegister(String jar) throws IOException {
mPigServer.registerJar(jar);
}
@@ -344,6 +352,7 @@
return writer.toString();
}
+ @Override
protected void processScript(String script, boolean batch,
List<String> params, List<String> files)
throws IOException, ParseException {
@@ -416,6 +425,7 @@
}
}
+ @Override
protected void processSet(String key, String value) throws IOException, ParseException {
if (key.equals("debug"))
{
@@ -460,6 +470,7 @@
}
}
+ @Override
protected void processCat(String path) throws IOException
{
executeBatch();
@@ -503,6 +514,7 @@
}
}
+ @Override
protected void processCD(String path) throws IOException
{
ContainerDescriptor container;
@@ -534,6 +546,7 @@
}
}
+ @Override
protected void processDump(String alias) throws IOException
{
Iterator<Tuple> result = mPigServer.openIterator(alias);
@@ -544,16 +557,19 @@
}
}
+ @Override
protected void processIllustrate(String alias) throws IOException
{
mPigServer.getExamples(alias);
}
+ @Override
protected void processKill(String jobid) throws IOException
{
- if (mJobClient != null) {
+ if (mJobConf != null) {
+ JobClient jc = new JobClient(mJobConf);
JobID id = JobID.forName(jobid);
- RunningJob job = mJobClient.getJob(id);
+ RunningJob job = jc.getJob(id);
if (job == null)
System.out.println("Job with id " + jobid + " is not active");
else
@@ -564,6 +580,7 @@
}
}
+ @Override
protected void processLS(String path) throws IOException
{
try {
@@ -613,11 +630,13 @@
System.out.println(elem.toString() + "<r " + replication + ">\t" + len);
}
+ @Override
protected void processPWD() throws IOException
{
System.out.println(mDfs.getActiveContainer().toString());
}
+ @Override
protected void printHelp()
{
System.out.println("Commands:");
@@ -636,6 +655,7 @@
System.out.println("quit");
}
+ @Override
protected void processMove(String src, String dst) throws IOException
{
executeBatch();
@@ -655,6 +675,7 @@
}
}
+ @Override
protected void processCopy(String src, String dst) throws IOException
{
executeBatch();
@@ -670,6 +691,7 @@
}
}
+ @Override
protected void processCopyToLocal(String src, String dst) throws IOException
{
executeBatch();
@@ -685,6 +707,7 @@
}
}
+ @Override
protected void processCopyFromLocal(String src, String dst) throws IOException
{
executeBatch();
@@ -700,12 +723,14 @@
}
}
+ @Override
protected void processMkdir(String dir) throws IOException
{
ContainerDescriptor dirDescriptor = mDfs.asContainer(dir);
dirDescriptor.create();
}
+ @Override
protected void processPig(String cmd) throws IOException
{
int start = 1;
@@ -721,6 +746,7 @@
}
}
+ @Override
protected void processRemove(String path, String options ) throws IOException
{
ElementDescriptor dfsPath = mDfs.asElement(path);
@@ -738,6 +764,7 @@
}
}
+ @Override
protected void processFsCommand(String[] cmdTokens) throws IOException{
try {
shell.run(cmdTokens);
@@ -773,7 +800,7 @@
private DataStorage mDfs;
private DataStorage mLfs;
private Properties mConf;
- private JobClient mJobClient;
+ private JobConf mJobConf;
private boolean mDone;
private boolean mLoadOnly;
private ExplainState mExplain;