You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/01/08 22:13:48 UTC
svn commit: r1056801 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/PigServer.java test/org/apache/pig/test/TestPigServer.java
Author: dvryaboy
Date: Sat Jan 8 21:13:46 2011
New Revision: 1056801
URL: http://svn.apache.org/viewvc?rev=1056801&view=rev
Log:
PIG-1675: allow PigServer to register pig script from InputStream
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/test/org/apache/pig/test/TestPigServer.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1056801&r1=1056800&r2=1056801&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jan 8 21:13:46 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1675: allow PigServer to register pig script from InputStream (zjffdu via dvryaboy)
+
PIG-1479: Embed Pig in scripting languages (rding)
PIG-946: Combiner optimizer does not optimize when limit follow group, foreach (thejas)
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1056801&r1=1056800&r2=1056801&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Sat Jan 8 21:13:46 2011
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.StringReader;
@@ -116,20 +117,20 @@ import org.apache.pig.tools.pigstats.Pig
/**
- *
+ *
* A class for Java programs to connect to Pig. Typically a program will create a PigServer
* instance. The programmer then registers queries using registerQuery() and
* retrieves results using openIterator() or store(). After doing so, the
* shutdown() method should be called to free any resources used by the current
* PigServer instance. Not doing so could result in a memory leak.
- *
+ *
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class PigServer {
-
+
private final Log log = LogFactory.getLog(getClass());
-
+
/**
* Given a string, determine the exec type.
* @param str accepted values are 'local', 'mapreduce', and 'mapred'
@@ -137,44 +138,54 @@ public class PigServer {
*/
public static ExecType parseExecType(String str) throws IOException {
String normStr = str.toLowerCase();
-
- if (normStr.equals("local")) return ExecType.LOCAL;
- if (normStr.equals("mapreduce")) return ExecType.MAPREDUCE;
- if (normStr.equals("mapred")) return ExecType.MAPREDUCE;
- if (normStr.equals("pig")) return ExecType.PIG;
- if (normStr.equals("pigbody")) return ExecType.PIG;
-
+
+ if (normStr.equals("local")) {
+ return ExecType.LOCAL;
+ }
+ if (normStr.equals("mapreduce")) {
+ return ExecType.MAPREDUCE;
+ }
+ if (normStr.equals("mapred")) {
+ return ExecType.MAPREDUCE;
+ }
+ if (normStr.equals("pig")) {
+ return ExecType.PIG;
+ }
+ if (normStr.equals("pigbody")) {
+ return ExecType.PIG;
+ }
+
int errCode = 2040;
String msg = "Unknown exec type: " + str;
throw new PigException(msg, errCode, PigException.BUG);
}
/*
- * The data structure to support grunt shell operations.
- * The grunt shell can only work on one graph at a time.
+ * The data structure to support grunt shell operations.
+ * The grunt shell can only work on one graph at a time.
* If a script is contained inside another script, the grunt
- * shell first saves the current graph on the stack and works
- * on a new graph. After the nested script is done, the grunt
+ * shell first saves the current graph on the stack and works
+ * on a new graph. After the nested script is done, the grunt
* shell pops up the saved graph and continues working on it.
*/
- private Stack<Graph> graphs = new Stack<Graph>();
-
+ private final Stack<Graph> graphs = new Stack<Graph>();
+
/*
* The current Graph the grunt shell is working on.
*/
private Graph currDAG;
-
- private PigContext pigContext;
-
+
+ private final PigContext pigContext;
+
private static int scopeCounter = 0;
- private String scope = constructScope();
+ private final String scope = constructScope();
private boolean aggregateWarning = true;
private boolean isMultiQuery = true;
-
+
private String constructScope() {
// scope servers for now as a session id
-
+
// String user = System.getProperty("user.name", "DEFAULT_USER_ID");
// String date = (new Date()).toString();
@@ -185,9 +196,9 @@ public class PigServer {
// operators to not include scope in their name().
return ""+(++scopeCounter);
}
-
+
/**
- * @param execTypeString can be 'mapreduce' or 'local'. Local mode will
+ * @param execTypeString can be 'mapreduce' or 'local'. Local mode will
* use Hadoop's local job runner to execute the job on the local machine.
* Mapreduce mode will connect to a cluster to execute the job.
* @throws ExecException
@@ -196,9 +207,9 @@ public class PigServer {
public PigServer(String execTypeString) throws ExecException, IOException {
this(parseExecType(execTypeString));
}
-
+
/**
- * @param execType execution type to start the engine. Local mode will
+ * @param execType execution type to start the engine. Local mode will
* use Hadoop's local job runner to execute the job on the local machine.
* Mapreduce mode will connect to a cluster to execute the job.
* @throws ExecException
@@ -210,25 +221,25 @@ public class PigServer {
public PigServer(ExecType execType, Properties properties) throws ExecException {
this(new PigContext(execType, properties));
}
-
+
public PigServer(PigContext context) throws ExecException {
this(context, true);
}
-
+
public PigServer(PigContext context, boolean connect) throws ExecException {
this.pigContext = context;
currDAG = new Graph(false);
-
+
aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
if (connect) {
pigContext.connect();
}
-
+
addJarsFromProperties();
}
-
+
private void addJarsFromProperties() throws ExecException {
//add jars from properties to extraJars
String jar_str = pigContext.getProperties().getProperty("pig.additional.jars");
@@ -238,7 +249,7 @@ public class PigServer {
registerJar(jar);
} catch (IOException e) {
int errCode = 4010;
- String msg =
+ String msg =
"Failed to register jar :" + jar + ". Caught exception.";
throw new ExecException(
msg,
@@ -254,7 +265,7 @@ public class PigServer {
public PigContext getPigContext(){
return pigContext;
}
-
+
/**
* Set the logging level to DEBUG.
*/
@@ -262,7 +273,7 @@ public class PigServer {
Logger.getLogger("org.apache.pig").setLevel(Level.DEBUG);
pigContext.getLog4jProperties().setProperty("log4j.logger.org.apache.pig", Level.DEBUG.toString());
}
-
+
/**
* Set the logging level to the default.
*/
@@ -270,7 +281,7 @@ public class PigServer {
Logger.getLogger("org.apache.pig").setLevel(pigContext.getDefaultLogLevel());
pigContext.getLog4jProperties().setProperty("log4j.logger.org.apache.pig", pigContext.getDefaultLogLevel().toString());
}
-
+
/**
* Set the default parallelism for this job
* @param p default number of reducers to use for this job.
@@ -278,13 +289,13 @@ public class PigServer {
public void setDefaultParallel(int p) {
pigContext.defaultParallel = p;
}
-
+
/**
* Starts batch execution mode.
*/
public void setBatchOn() {
log.debug("Create a new graph.");
-
+
if (currDAG != null) {
graphs.push(currDAG);
}
@@ -293,7 +304,7 @@ public class PigServer {
/**
* Retrieve the current execution mode.
- *
+ *
* @return true if the execution mode is batch; false otherwise.
*/
public boolean isBatchOn() {
@@ -320,8 +331,8 @@ public class PigServer {
}
/**
- * Submits a batch of Pig commands for execution.
- *
+ * Submits a batch of Pig commands for execution.
+ *
* @return list of jobs being executed
* @throws FrontendException
* @throws ExecException
@@ -334,7 +345,7 @@ public class PigServer {
while (iter.hasNext()) {
JobStats js = iter.next();
for (OutputStats output : js.getOutputs()) {
- if (js.isSuccessful()) {
+ if (js.isSuccessful()) {
jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, output
.getPOStore(), output.getAlias(), stats));
} else {
@@ -359,13 +370,13 @@ public class PigServer {
String msg = "setBatchOn() must be called first.";
throw new FrontendException(msg, errCode, PigException.INPUT);
}
-
+
return currDAG.execute();
}
-
+
/**
* Discards a batch of Pig commands.
- *
+ *
* @throws FrontendException
*/
public void discardBatch() throws FrontendException {
@@ -374,50 +385,51 @@ public class PigServer {
String msg = "setBatchOn() must be called first.";
throw new FrontendException(msg, errCode, PigException.INPUT);
}
-
+
currDAG = graphs.pop();
}
-
+
/**
- * Add a path to be skipped while automatically shipping binaries for
+ * Add a path to be skipped while automatically shipping binaries for
* streaming.
- *
+ *
* @param path path to be skipped
*/
public void addPathToSkip(String path) {
pigContext.addPathToSkip(path);
}
-
+
/**
* Defines an alias for the given function spec. This
- * is useful for functions that require arguments to the
+ * is useful for functions that require arguments to the
* constructor.
- *
+ *
* @param function - the new function alias to define.
* @param functionSpec - the name of the function and any arguments.
* It should have the form: classname('arg1', 'arg2', ...)
* @deprecated Use {@link #registerFunction(String, FuncSpec)}
*/
+ @Deprecated
public void registerFunction(String function, String functionSpec) {
registerFunction(function, new FuncSpec(functionSpec));
}
-
+
/**
* Defines an alias for the given function spec. This
- * is useful for functions that require arguments to the
+ * is useful for functions that require arguments to the
* constructor.
- *
+ *
* @param function - the new function alias to define.
- * @param funcSpec - the FuncSpec object representing the name of
+ * @param funcSpec - the FuncSpec object representing the name of
* the function class and any arguments to constructor.
*/
public void registerFunction(String function, FuncSpec funcSpec) {
pigContext.registerFunction(function, funcSpec);
}
-
+
/**
* Defines an alias for the given streaming command.
- *
+ *
* @param commandAlias - the new command alias to define
* @param command - streaming command to be executed
*/
@@ -428,66 +440,66 @@ public class PigServer {
private URL locateJarFromResources(String jarName) throws IOException {
Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
URL resourceLocation = null;
-
+
if (urls.hasMoreElements()) {
resourceLocation = urls.nextElement();
}
-
+
if (urls.hasMoreElements()) {
StringBuffer sb = new StringBuffer("Found multiple resources that match ");
sb.append(jarName);
sb.append(": ");
sb.append(resourceLocation);
-
+
while (urls.hasMoreElements()) {
sb.append(urls.nextElement());
sb.append("; ");
}
-
+
log.debug(sb.toString());
}
-
+
return resourceLocation;
}
-
+
/**
- * Registers a jar file. Name of the jar file can be an absolute or
+ * Registers a jar file. Name of the jar file can be an absolute or
* relative path.
- *
+ *
* If multiple resources are found with the specified name, the
* first one is registered as returned by getSystemResources.
* A warning is issued to inform the user.
- *
+ *
* @param name of the jar file to register
* @throws IOException
*/
public void registerJar(String name) throws IOException {
// first try to locate jar via system resources
- // if this fails, try by using "name" as File (this preserves
- // compatibility with case when user passes absolute path or path
- // relative to current working directory.)
+ // if this fails, try by using "name" as File (this preserves
+ // compatibility with case when user passes absolute path or path
+ // relative to current working directory.)
if (name != null) {
URL resource = locateJarFromResources(name);
if (resource == null) {
File f = FileLocalizer.fetchFile(pigContext.getProperties(), name).file;
-
+
if (!f.canRead()) {
int errCode = 4002;
String msg = "Can't read jar file: " + name;
throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
}
-
+
resource = f.toURI().toURL();
}
- pigContext.addJar(resource);
+ pigContext.addJar(resource);
}
}
-
+
/**
* Universal Scripting Language Support, see PIG-928
- *
+ *
* @param path path of the script file
* @param scriptingLang language keyword or scriptingEngine used to interpret the script
* @param namespace namespace defined for functions of this script
@@ -509,21 +521,21 @@ public class PigServer {
}
pigContext.addScriptFile(path);
}
-
+
/**
* Register a query with the Pig runtime. The query is parsed and registered, but it is not
* executed until it is needed.
- *
+ *
* @param query
* a Pig Latin expression to be evaluated.
* @param startLine
* line number of the query within the whole script
* @throws IOException
- */
- public void registerQuery(String query, int startLine) throws IOException {
+ */
+ public void registerQuery(String query, int startLine) throws IOException {
currDAG.registerQuery(query, startLine);
}
-
+
public Graph getClonedGraph() throws IOException {
Graph graph = currDAG.clone();
@@ -534,58 +546,73 @@ public class PigServer {
}
return graph;
}
-
+
/**
* Register a query with the Pig runtime. The query is parsed and registered, but it is not
* executed until it is needed. Equivalent to calling {@link #registerQuery(String, int)}
* with startLine set to 1.
- *
+ *
* @param query
* a Pig Latin expression to be evaluated.
* @throws IOException
- */
+ */
public void registerQuery(String query) throws IOException {
registerQuery(query, 1);
}
-
+
/**
- * Register a query with the Pig runtime. The query will be read from the indicated file.
- * @param fileName file to read query from.
+ * Register a pig script from InputStream source which is more general and extensible
+ * the pig script can be from local file, then you can use FileInputStream.
+ * or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+ * even pig script can be in remote machine, which you get wrap it as SocketInputStream
+ * @param in
* @throws IOException
*/
- public void registerScript(String fileName) throws IOException {
- registerScript(fileName, null, null);
+ public void registerScript(InputStream in) throws IOException{
+ registerScript(in, null, null);
}
-
+
/**
- * Register a pig script file. The parameters in the file will be substituted with the values in params
- * @param fileName pig script file
- * @param params the key is the parameter name, and the value is the parameter value
+ * Register a pig script from InputStream source which is more general and extensible
+ * the pig script can be from local file, then you can use FileInputStream.
+ * or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+ * even pig script can be in remote machine, which you get wrap it as SocketInputStream.
+ * The parameters in the pig script will be substituted with the values in params
+ * @param in
+ * @param params the key is the parameter name, and the value is the parameter value
* @throws IOException
*/
- public void registerScript(String fileName, Map<String,String> params) throws IOException {
- registerScript(fileName, params, null);
+ public void registerScript(InputStream in, Map<String,String> params) throws IOException{
+ registerScript(in, params, null);
}
/**
- * Register a pig script file. The parameters in the file will be substituted with the values in the parameter files
- * @param fileName pig script file
+ * Register a pig script from InputStream source which is more general and extensible
+ * the pig script can be from local file, then you can use FileInputStream.
+ * or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+ * even pig script can be in remote machine, which you get wrap it as SocketInputStream
+ * The parameters in the pig script will be substituted with the values in the parameter files
+ * @param in
* @param paramsFiles files which have the parameter setting
* @throws IOException
*/
- public void registerScript(String fileName, List<String> paramsFiles) throws IOException {
- registerScript(fileName, null, paramsFiles);
+ public void registerScript(InputStream in, List<String> paramsFiles) throws IOException {
+ registerScript(in, null, paramsFiles);
}
-
+
/**
- * Register a pig script file. The parameters in the file will be substituted with the values in the map and the parameter files
+ * Register a pig script from InputStream.<br>
+ * The pig script can be from local file, then you can use FileInputStream.
+ * Or pig script can be in memory which you build it dynamically, the you can use ByteArrayInputStream
+ * Pig script can even be in remote machine, which you get wrap it as SocketInputStream.<br>
+ * The parameters in the pig script will be substituted with the values in the map and the parameter files.
* The values in params Map will override the value in parameter file if they have the same parameter
- * @param fileName pig script
- * @param params the key is the parameter name, and the value is the parameter value
- * @param paramsFiles files which have the parameter setting
+ * @param in
+ * @param params the key is the parameter name, and the value is the parameter value
+ * @param paramsFiles files which have the parameter setting
* @throws IOException
*/
- public void registerScript(String fileName, Map<String,String> params,List<String> paramsFiles) throws IOException {
+ public void registerScript(InputStream in, Map<String,String> params,List<String> paramsFiles) throws IOException {
try {
// transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
List<String> paramList = new ArrayList<String>();
@@ -594,22 +621,19 @@ public class PigServer {
paramList.add(entry.getKey()+"="+entry.getValue());
}
}
-
+
// do parameter substitution
ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
StringWriter writer = new StringWriter();
- psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(new FileInputStream(fileName))),
- writer,
- paramList.size() > 0 ? paramList.toArray(new String[0]) : null,
+ psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(in)),
+ writer,
+ paramList.size() > 0 ? paramList.toArray(new String[0]) : null,
paramsFiles!=null ? paramsFiles.toArray(new String[0]) : null);
-
+
GruntParser grunt = new GruntParser(new StringReader(writer.toString()));
grunt.setInteractive(false);
grunt.setParams(this);
grunt.parseStopOnError(true);
- } catch (FileNotFoundException e) {
- log.error(e.getLocalizedMessage());
- throw new IOException(e.getCause());
} catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
log.error(e.getLocalizedMessage());
throw new IOException(e.getCause());
@@ -618,6 +642,61 @@ public class PigServer {
throw new IOException(e.getCause());
}
}
+
+ /**
+ * Register a query with the Pig runtime. The query will be read from the indicated file.
+ * @param fileName file to read query from.
+ * @throws IOException
+ */
+ public void registerScript(String fileName) throws IOException {
+ registerScript(fileName, null, null);
+ }
+
+ /**
+ * Register a pig script file. The parameters in the file will be substituted with the values in params
+ * @param fileName pig script file
+ * @param params the key is the parameter name, and the value is the parameter value
+ * @throws IOException
+ */
+ public void registerScript(String fileName, Map<String,String> params) throws IOException {
+ registerScript(fileName, params, null);
+ }
+
+
+
+ /**
+ * Register a pig script file. The parameters in the file will be substituted with the values in the parameter files
+ * @param fileName pig script file
+ * @param paramsFiles files which have the parameter setting
+ * @throws IOException
+ */
+ public void registerScript(String fileName, List<String> paramsFiles) throws IOException {
+ registerScript(fileName, null, paramsFiles);
+ }
+
+ /**
+ * Register a pig script file. The parameters in the file will be substituted with the values in the map and the parameter files
+ * The values in params Map will override the value in parameter file if they have the same parameter
+ * @param fileName pig script
+ * @param params the key is the parameter name, and the value is the parameter value
+ * @param paramsFiles files which have the parameter setting
+ * @throws IOException
+ */
+ public void registerScript(String fileName, Map<String,String> params,List<String> paramsFiles) throws IOException {
+ FileInputStream fis = null;
+ try{
+ fis = new FileInputStream(fileName);
+ registerScript(fis, params, paramsFiles);
+ }catch (FileNotFoundException e){
+ log.error(e.getLocalizedMessage());
+ throw new IOException(e.getCause());
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ }
+
/**
* Intended to be used by unit tests only.
* Print a list of all aliases in in the current Pig Latin script. Output is written to
@@ -645,16 +724,19 @@ public class PigServer {
break;
}
}
- if (schema != null) System.out.println(alias + ": " + schema.toString());
- else System.out.println("Schema for " + alias + " unknown.");
+ if (schema != null) {
+ System.out.println(alias + ": " + schema.toString());
+ } else {
+ System.out.println("Schema for " + alias + " unknown.");
+ }
return schema;
} catch (FrontendException fee) {
int errCode = 1001;
- String msg = "Unable to describe schema for alias " + alias;
+ String msg = "Unable to describe schema for alias " + alias;
throw new FrontendException (msg, errCode, PigException.INPUT, false, null, fee);
}
}
-
+
/**
* Write the schema for a nestedAlias to System.out. Denoted by alias::nestedAlias.
* @param alias Alias whose schema has nestedAlias
@@ -671,7 +753,7 @@ public class PigServer {
}
else {
int errCode = 1001;
- String msg = "Unable to describe schema for " + alias + "::" + nestedAlias;
+ String msg = "Unable to describe schema for " + alias + "::" + nestedAlias;
throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
}
}
@@ -683,7 +765,7 @@ public class PigServer {
public void setJobName(String name){
currDAG.setJobName(name);
}
-
+
/**
* Set Hadoop job priority. This value will get translated to mapred.job.priority.
* @param priority valid values are found in {@link org.apache.hadoop.mapred.JobPriority}
@@ -725,10 +807,10 @@ public class PigServer {
if (currDAG.isBatchOn()) {
currDAG.execute();
}
-
+
ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
.toString(), Utils.getTmpFileCompressorName(pigContext) + "()");
-
+
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
@@ -739,7 +821,7 @@ public class PigServer {
Exception e = job.getException();
int errCode = 1066;
String msg = "Unable to open iterator for alias " + id +
- ". Backend error : " + e.getMessage();
+ ". Backend error : " + e.getMessage();
throw new FrontendException(msg, errCode, PigException.INPUT, e);
} else {
throw new IOException("Job terminated with anomalous status "
@@ -751,11 +833,11 @@ public class PigServer {
}
catch (Exception e) {
int errCode = 1066;
- String msg = "Unable to open iterator for alias " + id ;
+ String msg = "Unable to open iterator for alias " + id ;
throw new FrontendException(msg, errCode, PigException.INPUT, e);
}
}
-
+
/**
* Executes a Pig Latin script up to and including indicated alias and stores the resulting
* records into a file. That is, if a user does:
@@ -775,7 +857,7 @@ public class PigServer {
* </pre>
* filtered and sorted data will be stored to the file <tt>bar</tt>.
* Equivalent to calling {@link #store(String, String, String)} with
- * <tt>org.apache.pig.PigStorage</tt> as the store function.
+ * <tt>org.apache.pig.PigStorage</tt> as the store function.
* @param id The alias to store
* @param filename The file to which to store to
* @return {@link ExecJob} containing information about this job
@@ -784,7 +866,7 @@ public class PigServer {
public ExecJob store(String id, String filename) throws IOException {
return store(id, filename, PigStorage.class.getName() + "()"); // SFPig is the default store function
}
-
+
/**
* Executes a Pig Latin script up to and including indicated alias and stores the resulting
* records into a file. That is, if a user does:
@@ -812,7 +894,7 @@ public class PigServer {
* @return {@link ExecJob} containing information about this job
* @throws IOException
*/
- public ExecJob store(String id, String filename, String func)
+ public ExecJob store(String id, String filename, String func)
throws IOException {
PigStats stats = storeEx(id, filename, func);
if (stats.getOutputStats().size() < 1) {
@@ -826,19 +908,20 @@ public class PigServer {
}else{
HJob job = new HJob(JOB_STATUS.FAILED, pigContext,
output.getPOStore(), output.getAlias(), stats);
-
+
//check for exception
Exception ex = null;
for(JobStats js : stats.getJobGraph()){
- if(js.getException() != null)
+ if(js.getException() != null) {
ex = js.getException();
+ }
}
job.setException(ex);
return job;
}
}
-
+
private PigStats storeEx(
String id,
String filename,
@@ -853,7 +936,7 @@ public class PigServer {
// MRCompiler needs a store to be the leaf - hence
// add a store to the plan to explain
-
+
// figure out the leaf to which the store needs to be added
List<LogicalOperator> leaves = lp.getLeaves();
LogicalOperator leaf = null;
@@ -862,24 +945,25 @@ public class PigServer {
} else {
for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
LogicalOperator leafOp = it.next();
- if(leafOp.getAlias().equals(id))
+ if(leafOp.getAlias().equals(id)) {
leaf = leafOp;
+ }
}
}
-
+
LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(
scope, lp, filename, func, leaf, leaf.getAlias(),
pigContext);
LogicalPlan storePlan = compileLp(unCompiledstorePlan, g, true);
-
+
return executeCompiledLogicalPlan(storePlan);
} catch (PigException e) {
int errCode = 1002;
String msg = "Unable to store alias " + id;
throw new PigException(msg, errCode, PigException.INPUT, e);
- }
+ }
}
-
+
/**
* Provide information on how a pig query will be executed. For now
* this information is very developer focussed, and probably not very
@@ -898,7 +982,7 @@ public class PigServer {
* @param alias Name of alias to explain.
* @param format Format in which the explain should be printed. If text, then the plan will
* be printed in plain text. Otherwise, the execution plan will be printed in
- * <a href="http://en.wikipedia.org/wiki/DOT_language">DOT</a> format.
+ * <a href="http://en.wikipedia.org/wiki/DOT_language">DOT</a> format.
* @param verbose Controls the amount of information printed
* @param markAsExecute When set will treat the explain like a
* call to execute in the respoect that all the pending stores are
@@ -931,7 +1015,7 @@ public class PigServer {
LogicalPlanMigrationVistor migrator = new LogicalPlanMigrationVistor(lp);
migrator.visit();
org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
-
+
HashSet<String> optimizerRules = null;
try {
optimizerRules = (HashSet<String>) ObjectSerializer
@@ -942,10 +1026,10 @@ public class PigServer {
String msg = "Unable to deserialize optimizer rules.";
throw new FrontendException(msg, errCode, PigException.BUG, ioe);
}
-
+
LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(newPlan, 3, optimizerRules);
- optimizer.optimize();
-
+ optimizer.optimize();
+
newPlan.explain(lps, format, verbose);
}
pp.explain(pps, format, verbose);
@@ -967,29 +1051,29 @@ public class PigServer {
* not take into account a replication factor, as that can vary from file
* to file. Thus if you are using this to determine if you data set will fit
* in the HDFS, you need to divide the result of this call by your specific replication
- * setting.
+ * setting.
* @return unused byte capacity of the file system.
* @throws IOException
*/
public long capacity() throws IOException {
if (pigContext.getExecType() == ExecType.LOCAL) {
throw new IOException("capacity only supported for non-local execution");
- }
+ }
else {
DataStorage dds = pigContext.getDfs();
-
+
Map<String, Object> stats = dds.getStatistics();
String rawCapacityStr = (String) stats.get(DataStorage.RAW_CAPACITY_KEY);
String rawUsedStr = (String) stats.get(DataStorage.RAW_USED_KEY);
-
+
if ((rawCapacityStr == null) || (rawUsedStr == null)) {
throw new IOException("Failed to retrieve capacity stats");
}
-
+
long rawCapacityBytes = new Long(rawCapacityStr).longValue();
long rawUsedBytes = new Long(rawUsedStr).longValue();
-
+
return rawCapacityBytes - rawUsedBytes;
}
}
@@ -1010,7 +1094,7 @@ public class PigServer {
return length * replication;
}
-
+
/**
* Test whether a file exists.
* @param filename to test
@@ -1021,7 +1105,7 @@ public class PigServer {
ElementDescriptor elem = pigContext.getDfs().asElement(filename);
return elem.exists();
}
-
+
/**
* Delete a file.
* @param filename to delete
@@ -1033,7 +1117,7 @@ public class PigServer {
elem.delete();
return true;
}
-
+
/**
* Rename a file.
* @param source file to rename
@@ -1045,7 +1129,7 @@ public class PigServer {
pigContext.rename(source, target);
return true;
}
-
+
/**
* Make a directory.
* @param dirs directory to make
@@ -1057,8 +1141,8 @@ public class PigServer {
container.create();
return true;
}
-
- /**
+
+ /**
* List the contents of a directory.
* @param dir name of directory to list
* @return array of strings, one for each file name
@@ -1068,16 +1152,16 @@ public class PigServer {
Collection<String> allPaths = new ArrayList<String>();
ContainerDescriptor container = pigContext.getDfs().asContainer(dir);
Iterator<ElementDescriptor> iter = container.iterator();
-
+
while (iter.hasNext()) {
ElementDescriptor elem = iter.next();
allPaths.add(elem.toString());
}
-
+
String[] type = new String[1];
return allPaths.toArray(type);
}
-
+
/**
* Does not work at the moment.
*/
@@ -1086,7 +1170,7 @@ public class PigServer {
// return MapReduceLauncher.totalHadoopTimeSpent;
return 0L;
}
-
+
/**
* Return a map containing the logical plan associated with each alias.
* @return map
@@ -1128,7 +1212,7 @@ public class PigServer {
public Map<Operator, DataBag> getExamples(String alias) throws IOException {
LogicalPlan plan = null;
- try {
+ try {
if (currDAG.isBatchOn() && alias != null) {
currDAG.execute();
}
@@ -1157,11 +1241,11 @@ public class PigServer {
private LogicalPlan getStorePlan(String alias) throws IOException {
Graph g = getClonedGraph();
LogicalPlan lp = g.getPlan(alias);
-
+
if (!isBatchOn() || alias != null) {
// MRCompiler needs a store to be the leaf - hence
// add a store to the plan to explain
-
+
// figure out the leaves to which stores need to be added
List<LogicalOperator> leaves = lp.getLeaves();
LogicalOperator leaf = null;
@@ -1170,20 +1254,21 @@ public class PigServer {
} else {
for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
LogicalOperator leafOp = it.next();
- if(leafOp.getAlias().equals(alias))
+ if(leafOp.getAlias().equals(alias)) {
leaf = leafOp;
+ }
}
}
-
- lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
+
+ lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
PigStorage.class.getName(), leaf, "fake", pigContext);
}
-
+
compileLp(lp, g, true);
-
+
return lp;
}
-
+
private PigStats execute(String alias) throws FrontendException, ExecException {
LogicalPlan typeCheckedLp = compileLp(alias);
@@ -1199,7 +1284,7 @@ public class PigServer {
return executeCompiledLogicalPlan(typeCheckedLp);
}
-
+
private PigStats executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException, FrontendException {
// discover pig features used in this script
ScriptState.get().setScriptFeatures(compiledLp);
@@ -1234,12 +1319,12 @@ public class PigServer {
private LogicalPlan compileLp(
String alias,
boolean optimize) throws FrontendException {
-
+
// create a clone of the logical plan and give it
// to the operations below
LogicalPlan lpClone;
Graph g;
-
+
try {
g = getClonedGraph();
lpClone = g.getPlan(alias);
@@ -1250,7 +1335,7 @@ public class PigServer {
}
return compileLp(lpClone, g, optimize);
}
-
+
private void mergeScalars(LogicalPlan lp, Graph g) throws FrontendException {
// When we start processing a store we look for scalars to add stores
// to respective logical plans and temporary files to the attributes
@@ -1269,13 +1354,13 @@ public class PigServer {
LogicalPlan referredPlan = g.getAliases().get(g.getAliasOp().get(alias));
- // If referredPlan already has a store,
+ // If referredPlan already has a store,
// we just use it instead of adding one from our pocket
store = referredPlan.getLeaves().get(0);
- if(store instanceof LOStore
+ if(store instanceof LOStore
&&
((LOStore)store).getOutputFile().getFuncName().equals(
- InterStorage.class.getName())
+ InterStorage.class.getName())
) {
// use this store
fileSpec = ((LOStore)store).getOutputFile();
@@ -1299,9 +1384,10 @@ public class PigServer {
innerPlan.add(rconst);
innerPlan.connect(rconst, scalarEntry.getKey());
-
- if (lp.getSoftLinkSuccessors(store)==null || !lp.getSoftLinkSuccessors(store).contains(scalarEntry.getValue().second))
+
+ if (lp.getSoftLinkSuccessors(store)==null || !lp.getSoftLinkSuccessors(store).contains(scalarEntry.getValue().second)) {
lp.createSoftLink(store, scalarEntry.getValue().second);
+ }
}
} catch (IOException ioe) {
int errCode = 2219;
@@ -1309,28 +1395,28 @@ public class PigServer {
throw new FrontendException(msg, errCode, PigException.BUG, ioe);
}
}
-
+
private LogicalPlan compileLp(LogicalPlan lp, Graph g, boolean optimize) throws FrontendException {
mergeScalars(lp, g);
-
+
return compileLp(lp, optimize);
}
-
+
@SuppressWarnings("unchecked")
private LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
FrontendException {
// Set the logical plan values correctly in all the operators
PlanSetter ps = new PlanSetter(lp);
ps.visit();
-
+
UnionOnSchemaSetter setUnionOnSchema = new UnionOnSchemaSetter(lp, pigContext);
setUnionOnSchema.visit();
-
+
// run through validator
CompilationMessageCollector collector = new CompilationMessageCollector() ;
boolean isBeforeOptimizer = true;
validate(lp, collector, isBeforeOptimizer);
-
+
// optimize
if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("false")) {
HashSet<String> optimizerRules = null;
@@ -1346,16 +1432,16 @@ public class PigServer {
LogicalOptimizer optimizer = new LogicalOptimizer(lp, pigContext.getExecType(), optimizerRules);
optimizer.optimize();
-
+
// compute whether output data is sorted or not
SortInfoSetter sortInfoSetter = new SortInfoSetter(lp);
sortInfoSetter.visit();
-
+
// run validations to be done after optimization
isBeforeOptimizer = false;
validate(lp, collector, isBeforeOptimizer);
}
-
+
return lp;
}
@@ -1372,16 +1458,16 @@ public class PigServer {
boolean isBeforeOptimizer) throws FrontendException {
FrontendException caught = null;
try {
- LogicalPlanValidationExecutor validator =
+ LogicalPlanValidationExecutor validator =
new LogicalPlanValidationExecutor(lp, pigContext, isBeforeOptimizer);
validator.validate(lp, collector);
} catch (FrontendException fe) {
// Need to go through and see what the collector has in it. But
// remember what we've caught so we can wrap it into what we
// throw.
- caught = fe;
+ caught = fe;
}
-
+
if(aggregateWarning) {
CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
} else {
@@ -1389,7 +1475,7 @@ public class PigServer {
CompilationMessageCollector.logAllMessages(collector, log);
}
}
-
+
if (caught != null) {
throw caught;
}
@@ -1408,10 +1494,10 @@ public class PigServer {
int errCode = 1005;
String msg = "No plan for " + alias + " to " + operation;
throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
- }
+ }
return lp;
}
-
+
public static class SortInfoSetter extends LOVisitor{
public SortInfoSetter(LogicalPlan plan) {
@@ -1420,14 +1506,14 @@ public class PigServer {
@Override
protected void visit(LOStore store) throws VisitorException {
-
+
LogicalOperator storePred = store.getPlan().getPredecessors(store).get(0);
if(storePred == null){
int errCode = 2051;
String msg = "Did not find a predecessor for Store." ;
- throw new VisitorException(msg, errCode, PigException.BUG);
+ throw new VisitorException(msg, errCode, PigException.BUG);
}
-
+
SortInfo sortInfo = null;
if(storePred instanceof LOLimit) {
storePred = store.getPlan().getPredecessors(storePred).get(0);
@@ -1443,8 +1529,9 @@ public class PigServer {
Object value = ((LOConst)root).getValue();
if (value instanceof Boolean && (Boolean)value==true) {
LogicalOperator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
- if (split instanceof LOSplit)
+ if (split instanceof LOSplit) {
storePred = store.getPlan().getPredecessors(split).get(0);
+ }
}
}
}
@@ -1466,38 +1553,38 @@ public class PigServer {
* This class holds the internal states of a grunt shell session.
*/
private class Graph {
-
- private Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
-
- private Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
-
- private Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
-
- private List<String> scriptCache = new ArrayList<String>();
+
+ private final Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+
+ private final Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
+
+ private final Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+
+ private final List<String> scriptCache = new ArrayList<String>();
// the fileNameMap contains filename to canonical filename
// mappings. This is done so we can reparse the cached script
// and remember the translation (current directory might only
// be correct during the first parse
private Map<String, String> fileNameMap = new HashMap<String, String>();
-
- private Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, LogicalPlan>();
-
- private Set<LOLoad> loadOps = new HashSet<LOLoad>();
+
+ private final Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, LogicalPlan>();
+
+ private final Set<LOLoad> loadOps = new HashSet<LOLoad>();
private String jobName;
-
+
private String jobPriority;
- private boolean batchMode;
+ private final boolean batchMode;
private int processedStores;
private int ignoreNumStores;
-
+
private LogicalPlan lp;
-
- Graph(boolean batchMode) {
+
+ Graph(boolean batchMode) {
this.batchMode = batchMode;
this.processedStores = 0;
this.ignoreNumStores = 0;
@@ -1505,25 +1592,25 @@ public class PigServer {
PigContext.JOB_NAME_PREFIX+":DefaultJobName");
this.lp = new LogicalPlan();
};
-
+
Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
-
+
Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
-
+
Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
-
+
List<String> getScriptCache() { return scriptCache; }
-
+
boolean isBatchOn() { return batchMode; };
boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
-
+
PigStats execute() throws ExecException, FrontendException {
pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
if (jobPriority != null) {
pigContext.getProperties().setProperty(PigContext.JOB_PRIORITY, jobPriority);
}
-
+
PigStats stats = PigServer.this.execute(null);
processedStores = storeOpTable.keySet().size();
return stats;
@@ -1543,7 +1630,7 @@ public class PigServer {
LogicalPlan getPlan(String alias) throws IOException {
LogicalPlan plan = lp;
-
+
if (alias != null) {
LogicalOperator op = aliasOp.get(alias);
if(op == null) {
@@ -1557,14 +1644,14 @@ public class PigServer {
}
void registerQuery(String query, int startLine) throws IOException {
-
+
LogicalPlan tmpLp = parseQuery(query, startLine);
-
+
// store away the query for use in cloning later
scriptCache.add(query);
if (tmpLp.getLeaves().size() == 1) {
LogicalOperator op = tmpLp.getSingleLeafPlanOutputOp();
-
+
// Check if we just processed a LOStore i.e. STORE
if (op instanceof LOStore) {
@@ -1596,17 +1683,17 @@ public class PigServer {
}
}
}
- }
-
- LogicalPlan parseQuery(String query, int startLine) throws IOException {
- if (query == null || query.length() == 0) {
+ }
+
+ LogicalPlan parseQuery(String query, int startLine) throws IOException {
+ if (query == null || query.length() == 0) {
int errCode = 1084;
String msg = "Invalid Query: Query is null or of size 0";
throw new FrontendException(msg, errCode, PigException.INPUT);
}
query = query.trim();
-
+
try {
return new LogicalPlanBuilder(PigServer.this.pigContext).parse(scope, query,
aliases, opTable, aliasOp, startLine, fileNameMap);
@@ -1623,7 +1710,7 @@ public class PigServer {
// There are two choices on how we clone the logical plan
// 1 - we really clone each operator and connect up the cloned operators
// 2 - we cache away the script till the point we need to clone
- // and then simply re-parse the script.
+ // and then simply re-parse the script.
// The latter approach is used here
// FIXME: There is one open issue with this now:
// Consider the following script:
@@ -1637,16 +1724,16 @@ public class PigServer {
// checks for file existence of files in the load
// in the case where the file is a local one -i.e. with file: prefix
// This will be a known issue now and we will need to revisit later
-
+
// parse each line of the cached script
int lineNumber = 1;
-
- // create data structures needed for parsing
+
+ // create data structures needed for parsing
Graph graph = new Graph(isBatchOn());
graph.ignoreNumStores = processedStores;
graph.processedStores = processedStores;
graph.fileNameMap = fileNameMap;
-
+
try {
for (Iterator<String> it = getScriptCache().iterator(); it.hasNext(); lineNumber++) {
if (isBatchOn()) {
@@ -1659,17 +1746,17 @@ public class PigServer {
} catch (IOException ioe) {
ioe.printStackTrace();
graph = null;
- }
+ }
return graph;
}
-
+
private void postProcess() throws IOException {
-
+
// Set the logical plan values correctly in all the operators
PlanSetter ps = new PlanSetter(lp);
ps.visit();
-
- // The following code deals with store/load combination of
+
+ // The following code deals with store/load combination of
// intermediate files. In this case we will replace the load operator
// with a (implicit) split operator, iff the load/store
// func is reversible (because that's when we can safely
@@ -1696,11 +1783,11 @@ public class PigServer {
String msg = "Failed to connect store with dependent load.";
throw new FrontendException(msg, errCode, ex);
}
-
-
+
+
//TODO
- //if the load has a schema then the type cast inserter has to introduce
+ //if the load has a schema then the type cast inserter has to introduce
//casts to get the right types. Since the type cast inserter runs later,
//removing the load could create problems. For example, if the storage function
//does not preserve type information required and the subsequent load created
@@ -1712,15 +1799,15 @@ public class PigServer {
//type information is preserved. Similarly, the load functions should support
//a similar interface. With these interfaces in place, the code below can be
//used to optimize the store/load combination
-
- /*
+
+ /*
LoadFunc lFunc = (LoadFunc) pigContext.instantiateFuncFromSpec(load.getInputFile().getFuncSpec());
StoreFunc sFunc = (StoreFunc) pigContext.instantiateFuncFromSpec(store.getOutputFile().getFuncSpec());
if (lFunc.getClass() == sFunc.getClass() && lFunc instanceof ReversibleLoadStoreFunc) {
-
+
log.info("Removing unnecessary load operation from location: "+ifile);
-
+
// In this case we remember the input file
// spec in the store. We might have to use it
// in the MR compiler to recreate the load, if
@@ -1728,27 +1815,27 @@ public class PigServer {
store.setInputSpec(load.getInputFile());
LogicalOperator storePred = lp.getPredecessors(store).get(0);
-
+
// In this case we remember the input file
// spec in the store. We might have to use it
// in the MR compiler to recreate the load, if
// the store happens on a job boundary.
store.setInputSpec(load.getInputFile());
-
+
Schema storePredSchema = storePred.getSchema();
if(storePredSchema != null) {
load.setSchema(storePredSchema);
- TypeCastInserter typeCastInserter = new TypeCastInserter(lp, LOLoad.class.getName());
+ TypeCastInserter typeCastInserter = new TypeCastInserter(lp, LOLoad.class.getName());
List<LogicalOperator> loadList = new ArrayList<LogicalOperator>();
loadList.add(load);
//the following needs a change to TypeCastInserter and LogicalTransformer
typeCastInserter.doTransform(loadList, false);
}
-
+
lp.disconnect(store, load);
lp.connect(storePred, load);
lp.removeAndReconnectMultiSucc(load);
-
+
List<LogicalOperator> succs = lp.getSuccessors(load);
} else {
try {
@@ -1757,7 +1844,7 @@ public class PigServer {
int errCode = 2128;
String msg = "Failed to connect store with dependent load.";
throw new FrontendException(msg, errCode, ex);
- }
+ }
}
*/
}
Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1056801&r1=1056800&r2=1056801&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Sat Jan 8 21:13:46 2011
@@ -21,6 +21,7 @@ package org.apache.pig.test;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -32,6 +33,7 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
@@ -47,6 +49,7 @@ import junit.framework.TestCase;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -624,6 +627,51 @@ public class TestPigServer extends TestC
}
}
+ // build the pig script from in-memory, and wrap it as ByteArrayInputStream
+ @Test
+ public void testRegisterScriptFromStream() throws Exception{
+ // using params map
+ PigServer pig=new PigServer(ExecType.LOCAL);
+ Map<String,String> params=new HashMap<String, String>();
+ params.put("input", "test/org/apache/pig/test/data/passwd");
+ String script="a = load '$input' using PigStorage(':');";
+ pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params);
+ Iterator<Tuple> iter=pig.openIterator("a");
+ int index=0;
+ List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
+ while(iter.hasNext()){
+ Tuple tuple=iter.next();
+ assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+ index++;
+ }
+
+ // using param file
+ pig=new PigServer(ExecType.LOCAL);
+ List<String> paramFile=new ArrayList<String>();
+ paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath());
+ pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),paramFile);
+ iter=pig.openIterator("a");
+ index=0;
+ expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":");
+ while(iter.hasNext()){
+ Tuple tuple=iter.next();
+ assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+ index++;
+ }
+
+ // using both param value and param file, param value should override param file
+ pig=new PigServer(ExecType.LOCAL);
+ pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params,paramFile);
+ iter=pig.openIterator("a");
+ index=0;
+ expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
+ while(iter.hasNext()){
+ Tuple tuple=iter.next();
+ assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
+ index++;
+ }
+ }
+
@Test
public void testPigProperties() throws Throwable {
File propertyFile = new File("pig.properties");