You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/06/25 19:52:45 UTC
svn commit: r958051 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/executionengine/util/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executi...
Author: rding
Date: Fri Jun 25 17:52:44 2010
New Revision: 958051
URL: http://svn.apache.org/viewvc?rev=958051&view=rev
Log:
PIG-1454: Consider clean up backend code
Removed:
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecPhysicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecPhysicalPlan.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecScopedLogicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
hadoop/pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLargeFile.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPi.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSplitStore.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStoreOld.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jun 25 17:52:44 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1454: Consider clean up backend code (rding)
+
PIG-1333: API interface to Pig (rding)
PIG-1405: Need to move many standard functions from piggybank into Pig
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Jun 25 17:52:44 2010
@@ -677,7 +677,8 @@ public class PigServer {
if (currDAG.isBatchOn()) {
currDAG.execute();
}
- ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext)
+
+ ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
.toString(), BinStorage.class.getName() + "()");
// invocation of "execute" is synchronous!
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Jun 25 17:52:44 2010
@@ -24,7 +24,6 @@ import java.net.SocketException;
import java.net.SocketImplFactory;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
@@ -43,9 +42,6 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.executionengine.util.ExecTools;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
@@ -53,6 +49,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
import org.apache.pig.experimental.logical.optimizer.UidStamper;
@@ -65,7 +62,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
-public class HExecutionEngine implements ExecutionEngine {
+public class HExecutionEngine {
public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
@@ -86,15 +83,12 @@ public class HExecutionEngine implements
// val: the operator key for the root of the phyisical plan
protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
- protected Map<OperatorKey, ExecPhysicalOperator> physicalOpTable;
-
// map from LOGICAL key to into about the execution
protected Map<OperatorKey, MapRedResult> materializedResults;
public HExecutionEngine(PigContext pigContext) {
this.pigContext = pigContext;
- this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
- this.physicalOpTable = new HashMap<OperatorKey, ExecPhysicalOperator>();
+ this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
this.materializedResults = new HashMap<OperatorKey, MapRedResult>();
this.ds = null;
@@ -111,12 +105,7 @@ public class HExecutionEngine implements
public Map<OperatorKey, MapRedResult> getMaterializedResults() {
return this.materializedResults;
}
-
- public Map<OperatorKey, ExecPhysicalOperator> getPhysicalOpTable() {
- return this.physicalOpTable;
- }
-
-
+
public DataStorage getDataStorage() {
return this.ds;
}
@@ -324,14 +313,9 @@ public class HExecutionEngine implements
}
- public List<ExecJob> submit(PhysicalPlan plan,
- String jobName) throws ExecException {
- throw new UnsupportedOperationException();
- }
-
public void explain(PhysicalPlan plan, PrintStream stream, String format, boolean verbose) {
try {
- ExecTools.checkLeafIsStore(plan, pigContext);
+ MapRedUtil.checkLeafIsStore(plan, pigContext);
MapReduceLauncher launcher = new MapReduceLauncher();
launcher.explain(plan, pigContext, stream, format, verbose);
@@ -340,19 +324,7 @@ public class HExecutionEngine implements
throw new RuntimeException(ve);
}
}
-
- public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
- throw new UnsupportedOperationException();
- }
-
- public Collection<String> activeScopes() throws ExecException {
- throw new UnsupportedOperationException();
- }
-
- public void reclaimScope(String scope) throws ExecException {
- throw new UnsupportedOperationException();
- }
-
+
@SuppressWarnings("unchecked")
private void setSSHFactory(){
Properties properties = this.pigContext.getProperties();
@@ -375,6 +347,7 @@ public class HExecutionEngine implements
* @param conf JobConf with appropriate hadoop resource files
* @param properties Pig properties that will override hadoop properties; properties might be modified
*/
+ @SuppressWarnings("deprecation")
private void recomputeProperties(JobConf jobConf, Properties properties) {
// We need to load the properties from the hadoop configuration
// We want to override these with any existing properties we have.
@@ -418,7 +391,7 @@ public class HExecutionEngine implements
String scope = leaf.getOperatorKey().getScope();
POStore str = new POStore(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
+ spec = new FileSpec(FileLocalizer.getTemporaryPath(
pigContext).toString(),
new FuncSpec(BinStorage.class.getName()));
str.setSFile(spec);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Jun 25 17:52:44 2010
@@ -474,7 +474,7 @@ public class JobControlCompiler{
else { // multi store case
log.info("Setting up multi store job");
String tmpLocationStr = FileLocalizer
- .getTemporaryPath(null, pigContext).toString();
+ .getTemporaryPath(pigContext).toString();
tmpLocation = new Path(tmpLocationStr);
nwJob.setOutputFormatClass(PigOutputFormat.class);
@@ -986,7 +986,7 @@ public class JobControlCompiler{
// DistributedCache
if (shipToCluster) {
Path dst =
- new Path(FileLocalizer.getTemporaryPath(null, pigContext).toString());
+ new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
FileSystem fs = dst.getFileSystem(conf);
fs.copyFromLocalFile(src, dst);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Jun 25 17:52:44 2010
@@ -39,7 +39,6 @@ import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -594,7 +593,7 @@ public class MRCompiler extends PhyPlanV
* @throws IOException
*/
private FileSpec getTempFileSpec() throws IOException {
- return new FileSpec(FileLocalizer.getTemporaryPath(null, pigContext).toString(),
+ return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
new FuncSpec(BinStorage.class.getName()));
}
@@ -2240,13 +2239,13 @@ public class MRCompiler extends PhyPlanV
rpce.setRequestedParallelism(rp);
int val = rp;
if(val<=0){
- ExecutionEngine eng = pigContext.getExecutionEngine();
+ HExecutionEngine eng = pigContext.getExecutionEngine();
if(pigContext.getExecType() != ExecType.LOCAL){
try {
if(val<=0)
val = pigContext.defaultParallel;
if (val<=0)
- val = ((HExecutionEngine)eng).getJobConf().getNumReduceTasks();
+ val = eng.getJobConf().getNumReduceTasks();
if (val<=0)
val = 1;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Jun 25 17:52:44 2010
@@ -44,7 +44,6 @@ import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
@@ -115,10 +114,10 @@ public class MapReduceLauncher extends L
aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
- ExecutionEngine exe = pc.getExecutionEngine();
+ HExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
- JobClient jobClient = new JobClient(((HExecutionEngine)exe).getJobConf());
+ JobClient jobClient = new JobClient(exe.getJobConf());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Jun 25 17:52:44 2010
@@ -1490,7 +1490,7 @@ public class LogToPhyTranslationVisitor
physOp.setAlias(split.getAlias());
FileSpec splStrFile;
try {
- splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),new FuncSpec(BinStorage.class.getName()));
+ splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(BinStorage.class.getName()));
} catch (IOException e1) {
byte errSrc = pc.getErrorSource();
int errCode = 0;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Jun 25 17:52:44 2010
@@ -32,13 +32,24 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
@@ -121,6 +132,32 @@ public class MapRedUtil {
udfc.addJobConf(job);
udfc.deserialize();
}
+
+ public static FileSpec checkLeafIsStore(
+ PhysicalPlan plan,
+ PigContext pigContext) throws ExecException {
+ try {
+ PhysicalOperator leaf = plan.getLeaves().get(0);
+ FileSpec spec = null;
+ if(!(leaf instanceof POStore)){
+ String scope = leaf.getOperatorKey().getScope();
+ POStore str = new POStore(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ spec = new FileSpec(FileLocalizer.getTemporaryPath(
+ pigContext).toString(),
+ new FuncSpec(BinStorage.class.getName()));
+ str.setSFile(spec);
+ plan.addAsLeaf(str);
+ } else{
+ spec = ((POStore)leaf).getSFile();
+ }
+ return spec;
+ } catch (Exception e) {
+ int errCode = 2045;
+ String msg = "Internal error. Not able to check if the leaf node is a store operator.";
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
/**
* Get all files recursively from the given list of files
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Fri Jun 25 17:52:44 2010
@@ -386,6 +386,7 @@ implements LoadCaster, StoreFuncInterfac
return null;
}
+ @SuppressWarnings("deprecation")
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Fri Jun 25 17:52:44 2010
@@ -889,7 +889,7 @@ public class LogToPhyTranslationVisitor
physOp.setAlias(loSplit.getAlias());
FileSpec splStrFile;
try {
- splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),new FuncSpec(BinStorage.class.getName()));
+ splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(BinStorage.class.getName()));
} catch (IOException e1) {
byte errSrc = pc.getErrorSource();
int errCode = 0;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Fri Jun 25 17:52:44 2010
@@ -47,10 +47,8 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.streaming.ExecutableManager;
@@ -75,9 +73,6 @@ public class PigContext implements Seria
//one of: local, mapreduce, pigbody
private ExecType execType;;
- // configuration for connecting to hadoop
- private Properties conf = new Properties();
-
// extra jar files that are needed to run a job
transient public List<URL> extraJars = new LinkedList<URL>();
@@ -91,10 +86,8 @@ public class PigContext implements Seria
transient private DataStorage lfs;
// handle to the back-end
- transient private ExecutionEngine executionEngine;
+ transient private HExecutionEngine executionEngine;
- private String jobName = JOB_NAME_PREFIX; // can be overwritten by users
-
private Properties properties;
/**
@@ -301,7 +294,7 @@ public class PigContext implements Seria
srcElement.copy(dstElement, this.properties, false);
}
- public ExecutionEngine getExecutionEngine() {
+ public HExecutionEngine getExecutionEngine() {
return executionEngine;
}
@@ -412,8 +405,8 @@ public class PigContext implements Seria
//return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
return new URLClassLoader(urls, PigContext.class.getClassLoader());
}
-
-
+
+ @SuppressWarnings("unchecked")
public static Class resolveClassName(String name) throws IOException{
for(String prefix: packageImportList) {
@@ -502,9 +495,9 @@ public class PigContext implements Seria
public static Object instantiateFuncFromSpec(String funcSpec) {
return instantiateFuncFromSpec(new FuncSpec(funcSpec));
- }
-
+ }
+ @SuppressWarnings("unchecked")
public Class getClassForAlias(String alias) throws IOException{
String className = null;
FuncSpec funcSpec = null;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Fri Jun 25 17:52:44 2010
@@ -35,6 +35,8 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -44,6 +46,7 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HPath;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.impl.PigContext;
@@ -281,7 +284,9 @@ public class FileLocalizer {
* @param storage The DataStorage object used to open the fileSpec
* @return InputStream to the fileSpec
* @throws IOException
+ * @deprecated Use {@link #open(String, PigContext)} instead
*/
+ @Deprecated
static public InputStream open(String fileName, ExecType execType, DataStorage storage) throws IOException {
fileName = checkDefaultPrefix(execType, fileName);
if (!fileName.startsWith(LOCAL_PREFIX)) {
@@ -295,6 +300,10 @@ public class FileLocalizer {
}
}
+ /**
+ * @deprecated Use {@link #fullPath(String, PigContext)} instead
+ */
+ @Deprecated
public static String fullPath(String fileName, DataStorage storage) {
String fullPath;
try {
@@ -489,6 +498,10 @@ public class FileLocalizer {
setInitialized(false);
}
+ /**
+ * @deprecated Use {@link #getTemporaryPath(PigContext)} instead
+ */
+ @Deprecated
public static synchronized ElementDescriptor
getTemporaryPath(ElementDescriptor relative,
PigContext pigContext) throws IOException {
@@ -504,6 +517,18 @@ public class FileLocalizer {
return elem;
}
+ public static Path getTemporaryPath(PigContext pigContext) throws IOException {
+ ElementDescriptor relative = relativeRoot(pigContext);
+
+ if (!relativeRoot(pigContext).exists()) {
+ relativeRoot(pigContext).create();
+ }
+ ElementDescriptor elem=
+ pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt());
+ toDelete().push(elem);
+ return ((HPath)elem).getPath();
+ }
+
public static String hadoopify(String filename, PigContext pigContext) throws IOException {
if (filename.startsWith(LOCAL_PREFIX)) {
filename = filename.substring(LOCAL_PREFIX.length());
@@ -556,6 +581,10 @@ public class FileLocalizer {
return fileExists(filename, context.getFs());
}
+ /**
+ * @deprecated Use {@link #fileExists(String, PigContext)} instead
+ */
+ @Deprecated
public static boolean fileExists(String filename, DataStorage store)
throws IOException {
ElementDescriptor elem = store.asElement(filename);
@@ -567,6 +596,10 @@ public class FileLocalizer {
return !isDirectory(filename, context.getDfs());
}
+ /**
+ * @deprecated Use {@link #isFile(String, PigContext)} instead
+ */
+ @Deprecated
public static boolean isFile(String filename, DataStorage store)
throws IOException {
return !isDirectory(filename, store);
@@ -577,6 +610,10 @@ public class FileLocalizer {
return isDirectory(filename, context.getDfs());
}
+ /**
+ * @deprecated Use {@link #isDirectory(String, PigContext)} instead.
+ */
+ @Deprecated
public static boolean isDirectory(String filename, DataStorage store)
throws IOException {
ElementDescriptor elem = store.asElement(filename);
Modified: hadoop/pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java Fri Jun 25 17:52:44 2010
@@ -18,7 +18,6 @@
package org.apache.pig.pen;
-import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,7 +31,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
@@ -52,8 +50,6 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.pen.util.LineageTracer;
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Jun 25 17:52:44 2010
@@ -31,7 +31,6 @@ import java.io.PrintStream;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
-import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@@ -44,7 +43,6 @@ import jline.ConsoleReaderInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FsShell;
-
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
@@ -55,7 +53,6 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
@@ -198,13 +195,8 @@ public class GruntParser extends PigScri
// the back end to kill a given job (mJobClient is used only in
// processKill)
//
- ExecutionEngine execEngine = mPigServer.getPigContext().getExecutionEngine();
- if (execEngine instanceof HExecutionEngine) {
- mJobConf = ((HExecutionEngine)execEngine).getJobConf();
- }
- else {
- mJobConf = null;
- }
+ HExecutionEngine execEngine = mPigServer.getPigContext().getExecutionEngine();
+ mJobConf = execEngine.getJobConf();
}
@Override
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Fri Jun 25 17:52:44 2010
@@ -34,7 +34,6 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -79,8 +78,9 @@ public class TestCounters {
//counting the no. of bytes in the output file
//long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
- "output_map_only", pigServer.getPigContext()),
- pigServer.getPigContext());
+ "output_map_only", pigServer.getPigContext()), pigServer
+ .getPigContext());
+
long filesize = 0;
while(is.read() != -1) filesize++;
@@ -131,6 +131,7 @@ public class TestCounters {
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
"output_map_only", pigServer.getPigContext()),
pigServer.getPigContext());
+
long filesize = 0;
while(is.read() != -1) filesize++;
@@ -184,10 +185,12 @@ public class TestCounters {
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
+
ExecJob job = pigServer.store("c", "output");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
+
long filesize = 0;
while(is.read() != -1) filesize++;
@@ -357,11 +360,13 @@ public class TestCounters {
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
+
ExecJob job = pigServer.store("c", "output", "BinStorage");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
+
long filesize = 0;
while(is.read() != -1) filesize++;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java Fri Jun 25 17:52:44 2010
@@ -38,6 +38,7 @@ import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
@@ -393,7 +394,7 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pigServer.getPigContext()).toString();
+ String tmpOutputFile = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -433,7 +434,6 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 10;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
@@ -442,8 +442,6 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -476,7 +474,6 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 10;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
@@ -485,8 +482,6 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -524,14 +519,11 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 20;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
ps.println(i);
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -540,7 +532,7 @@ public class TestEvalPipelineLocal exten
if(!iter.hasNext()) fail("No output found");
int numIdentity = 0;
while(iter.hasNext()){
- Tuple t = iter.next();
+ iter.next();
++numIdentity;
}
assertEquals(5, numIdentity);
@@ -775,7 +767,6 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 2;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
@@ -784,8 +775,6 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -870,7 +859,6 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 10;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
@@ -879,8 +867,6 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -916,7 +902,6 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 10;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
@@ -925,8 +910,6 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -967,7 +950,6 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 10;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
@@ -976,8 +958,6 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
@@ -1012,7 +992,6 @@ public class TestEvalPipelineLocal exten
int LOOP_COUNT = 2;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
@@ -1021,8 +1000,6 @@ public class TestEvalPipelineLocal exten
}
ps.close();
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
- pigServer.getPigContext()).toString();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Fri Jun 25 17:52:44 2010
@@ -17,67 +17,33 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
-import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.util.ConfigurationValidator;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.test.utils.GenPhyOp;
-import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.LogicalPlanTester;
-import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -477,7 +443,7 @@ public class TestJobSubmission extends j
}
}
- ExecutionEngine exe = pc.getExecutionEngine();
+ HExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
@@ -500,7 +466,7 @@ public class TestJobSubmission extends j
pp.addAsLeaf(store);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- ExecutionEngine exe = pc.getExecutionEngine();
+ HExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
@@ -526,7 +492,7 @@ public class TestJobSubmission extends j
pp.addAsLeaf(store);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- ExecutionEngine exe = pc.getExecutionEngine();
+ HExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
@@ -559,7 +525,7 @@ public class TestJobSubmission extends j
pp.addAsLeaf(store);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- ExecutionEngine exe = pc.getExecutionEngine();
+ HExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLargeFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLargeFile.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLargeFile.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLargeFile.java Fri Jun 25 17:52:44 2010
@@ -103,7 +103,7 @@ public class TestLargeFile extends TestC
throw ioe;
}
fileName = "'" + FileLocalizer.hadoopify(datFile.toString(), pig.getPigContext()) + "'";
- tmpFile1 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
+ tmpFile1 = "'" + FileLocalizer.getTemporaryPath(pig.getPigContext()).toString() + "'";
datFile.delete();
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Fri Jun 25 17:52:44 2010
@@ -31,7 +31,6 @@ import junit.framework.Assert;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.util.ExecTools;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -39,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -1472,7 +1472,7 @@ public class TestMultiQueryCompiler {
System.out.println("===== check map-reduce plan =====");
- ExecTools.checkLeafIsStore(pp, myPig.getPigContext());
+ MapRedUtil.checkLeafIsStore(pp, myPig.getPigContext());
MapReduceLauncher launcher = new MapReduceLauncher();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPi.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPi.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPi.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPi.java Fri Jun 25 17:52:44 2010
@@ -114,7 +114,7 @@ public class TestPi extends TestCase {
}
fileName = "'" + FileLocalizer.hadoopify(datFile.toString(), pig.getPigContext()) + "'";
- tmpFile1 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
+ tmpFile1 = "'" + FileLocalizer.getTemporaryPath(pig.getPigContext()).toString() + "'";
datFile.delete();
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSplitStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSplitStore.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSplitStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSplitStore.java Fri Jun 25 17:52:44 2010
@@ -75,8 +75,8 @@ public class TestSplitStore extends Test
pig.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
- pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
- pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.store("A1", "'" + FileLocalizer.getTemporaryPath(pigContext) + "'");
+ pig.store("A2", "'" + FileLocalizer.getTemporaryPath(pigContext) + "'");
}
@Test
@@ -85,7 +85,7 @@ public class TestSplitStore extends Test
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
pig.openIterator("A1");
- pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.store("A2", "'" + FileLocalizer.getTemporaryPath(pigContext) + "'");
}
@Test
@@ -94,7 +94,7 @@ public class TestSplitStore extends Test
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
pig.openIterator("A2");
- pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.store("A1", "'" + FileLocalizer.getTemporaryPath(pigContext) + "'");
}
@Test
@@ -102,7 +102,7 @@ public class TestSplitStore extends Test
pig.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
- pig.store("A1", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.store("A1", "'" + FileLocalizer.getTemporaryPath(pigContext) + "'");
pig.openIterator("A2");
}
@@ -111,7 +111,7 @@ public class TestSplitStore extends Test
pig.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
- pig.store("A2", "'" + FileLocalizer.getTemporaryPath(null, pigContext) + "'");
+ pig.store("A2", "'" + FileLocalizer.getTemporaryPath(pigContext) + "'");
pig.openIterator("A1");
}
@@ -121,7 +121,7 @@ public class TestSplitStore extends Test
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
pig.openIterator("A1");
- pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(pigContext) + "';");
}
@Test
@@ -130,7 +130,7 @@ public class TestSplitStore extends Test
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
pig.openIterator("A2");
- pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(pigContext) + "';");
}
@Test
@@ -138,7 +138,7 @@ public class TestSplitStore extends Test
pig.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
- pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ pig.registerQuery("Store A1 into '" + FileLocalizer.getTemporaryPath(pigContext) + "';");
pig.openIterator("A2");
}
@@ -147,7 +147,7 @@ public class TestSplitStore extends Test
pig.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pig.getPigContext()) + "';");
pig.registerQuery("Split A into A1 if $0<=10, A2 if $0>10;");
- pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(null, pigContext) + "';");
+ pig.registerQuery("Store A2 into '" + FileLocalizer.getTemporaryPath(pigContext) + "';");
pig.openIterator("A1");
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStoreOld.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStoreOld.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStoreOld.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStoreOld.java Fri Jun 25 17:52:44 2010
@@ -123,8 +123,8 @@ public class TestStoreOld extends TestCa
pw.close();
pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
fileName = "'" + FileLocalizer.hadoopify(f.toString(), pig.getPigContext()) + "'";
- tmpFile1 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
- tmpFile2 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
+ tmpFile1 = "'" + FileLocalizer.getTemporaryPath(pig.getPigContext()).toString() + "'";
+ tmpFile2 = "'" + FileLocalizer.getTemporaryPath(pig.getPigContext()).toString() + "'";
f.delete();
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=958051&r1=958050&r2=958051&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Fri Jun 25 17:52:44 2010
@@ -774,7 +774,7 @@ public class GenPhyOp{
}
private static FileSpec getTempFileSpec() throws IOException {
- return new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),new FuncSpec(BinStorage.class.getName()));
+ return new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(BinStorage.class.getName()));
}
public static POSplit topSplitOp() throws IOException{