You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/04/23 18:57:20 UTC
svn commit: r767974 [3/4] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/executionengine/util/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/execu...
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Thu Apr 23 16:57:16 2009
@@ -27,14 +27,17 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
+import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.FileNotFoundException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.Map;
+import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
+import java.util.Date;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.PrintStream;
@@ -88,46 +91,78 @@
}
private void init() {
- // nothing, for now.
- }
-
- public void parseStopOnError() throws IOException, ParseException
- {
- prompt();
mDone = false;
- while(!mDone) {
- parse();
+ mLoadOnly = false;
+ mExplain = null;
+ }
+
+ private void setBatchOn() {
+ mPigServer.setBatchOn();
+ }
+
+ private void executeBatch() throws IOException {
+ if (mPigServer.isBatchOn()) {
+ if (mExplain != null) {
+ explainCurrentBatch();
+ }
+
+ if (!mLoadOnly) {
+ mPigServer.executeBatch();
+ }
}
}
- public void parseContOnError()
+ private void discardBatch() throws IOException {
+ if (mPigServer.isBatchOn()) {
+ mPigServer.discardBatch();
+ }
+ }
+
+ public void parseStopOnError() throws IOException, ParseException
{
- prompt();
- mDone = false;
- while(!mDone) {
- try
- {
- parse();
- }
- catch(Exception e)
- {
- Exception pe = LogUtils.getPermissionException(e);
- if (pe != null)
- log.error("You don't have permission to perform the operation. Error from the server: " + pe.getMessage());
- else {
- ByteArrayOutputStream bs = new ByteArrayOutputStream();
- e.printStackTrace(new PrintStream(bs));
- log.error(bs.toString());
- //log.error(e.getMessage());
- //log.error(e);
- }
+ parseStopOnError(false);
+ }
+
+ /**
+ * Parses Pig commands in either interactive mode or batch mode.
+ * In interactive mode, executes the plan right away whenever a
+ * STORE command is encountered.
+ *
+ * @throws IOException, ParseException
+ */
+ public void parseStopOnError(boolean sameBatch) throws IOException, ParseException
+ {
+ if (mPigServer == null) {
+ throw new IllegalStateException();
+ }
+
+ if (!mInteractive && !sameBatch) {
+ setBatchOn();
+ }
- } catch (Error e) {
- log.error(e);
+ try {
+ prompt();
+ mDone = false;
+ while(!mDone) {
+ parse();
}
+
+ if (!sameBatch) {
+ executeBatch();
+ }
+ }
+ finally {
+ if (!sameBatch) {
+ discardBatch();
+ }
}
}
+ public void setLoadOnly(boolean loadOnly)
+ {
+ mLoadOnly = loadOnly;
+ }
+
public void setParams(PigServer pigServer)
{
mPigServer = pigServer;
@@ -153,10 +188,7 @@
public void prompt()
{
- if (mInteractive)
- {
- /*System.err.print("grunt> ");
- System.err.flush();*/
+ if (mInteractive) {
mConsoleReader.setDefaultPrompt("grunt> ");
}
}
@@ -165,6 +197,10 @@
{
mDone = true;
}
+
+ public boolean isDone() {
+ return mDone;
+ }
protected void processDescribe(String alias) throws IOException {
if(alias==null) {
@@ -173,21 +209,100 @@
mPigServer.dumpSchema(alias);
}
- protected void printAliases() throws IOException {
- mPigServer.printAliases();
+ protected void processExplain(String alias, String script, boolean isVerbose,
+ String format, String target,
+ List<String> params, List<String> files)
+ throws IOException, ParseException {
+
+ if (null != mExplain) {
+ return;
+ }
+
+ try {
+ mExplain = new ExplainState(alias, target, script, isVerbose, format);
+
+ if (script != null) {
+ if (!"true".equalsIgnoreCase(mPigServer.
+ getPigContext()
+ .getProperties().
+ getProperty("opt.multiquery","true"))) {
+ throw new ParseException("Cannot explain script if multiquery is disabled.");
+ }
+ setBatchOn();
+ try {
+ loadScript(script, true, true, params, files);
+ } catch(IOException e) {
+ discardBatch();
+ throw e;
+ } catch (ParseException e) {
+ discardBatch();
+ throw e;
+ }
+ }
+
+ mExplain.mLast = true;
+ explainCurrentBatch();
+
+ } finally {
+ if (script != null) {
+ discardBatch();
+ }
+ mExplain = null;
+ }
}
+ protected void explainCurrentBatch() throws IOException {
+ PrintStream lp = System.out;
+ PrintStream pp = System.out;
+ PrintStream ep = System.out;
+
+ if (!(mExplain.mLast && mExplain.mCount == 0)) {
+ if (mPigServer.isBatchEmpty()) {
+ return;
+ }
+ }
+
+ mExplain.mCount++;
+ boolean markAsExecuted = (mExplain.mScript != null);
- protected void processExplain(String alias) throws IOException {
- mPigServer.explain(alias, System.out);
+ if (mExplain.mTarget != null) {
+ File file = new File(mExplain.mTarget);
+
+ if (file.isDirectory()) {
+ String sCount = (mExplain.mLast && mExplain.mCount == 1)?"":"_"+mExplain.mCount;
+ lp = new PrintStream(new File(file, "logical_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
+ pp = new PrintStream(new File(file, "physical_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
+ ep = new PrintStream(new File(file, "exec_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
+ mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
+ mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+ lp.close();
+ pp.close();
+ ep.close();
+ }
+ else {
+ boolean append = !(mExplain.mCount==1);
+ lp = pp = ep = new PrintStream(new FileOutputStream(mExplain.mTarget, append));
+ mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
+ mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+ lp.close();
+ }
+ }
+ else {
+ mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
+ mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+ }
+ }
+
+ protected void printAliases() throws IOException {
+ mPigServer.printAliases();
}
protected void processRegister(String jar) throws IOException {
mPigServer.registerJar(jar);
}
- private String runPreprocessor(String script, ArrayList<String> params,
- ArrayList<String> files)
+ private String runPreprocessor(String script, List<String> params,
+ List<String> files)
throws IOException, ParseException {
ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
@@ -206,7 +321,30 @@
}
protected void processScript(String script, boolean batch,
- ArrayList<String> params, ArrayList<String> files)
+ List<String> params, List<String> files)
+ throws IOException, ParseException {
+
+ if (script == null) {
+ executeBatch();
+ return;
+ }
+
+ if (batch) {
+ setBatchOn();
+ mPigServer.setJobName(script);
+ try {
+ loadScript(script, true, mLoadOnly, params, files);
+ executeBatch();
+ } finally {
+ discardBatch();
+ }
+ } else {
+ loadScript(script, false, mLoadOnly, params, files);
+ }
+ }
+
+ private void loadScript(String script, boolean batch, boolean loadOnly,
+ List<String> params, List<String> files)
throws IOException, ParseException {
Reader inputReader;
@@ -217,6 +355,9 @@
String cmds = runPreprocessor(script, params, files);
if (mInteractive && !batch) { // Write prompt and echo commands
+ // Console reader treats tabs in a special way
+ cmds = cmds.replaceAll("\t"," ");
+
reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()),
new OutputStreamWriter(System.out));
reader.setHistory(mConsoleReader.getHistory());
@@ -234,16 +375,18 @@
throw new ParseException("Cannot access file: " + script);
}
- // In batch mode: Use a new server to avoid side-effects (handles, etc)
- PigServer pigServer = batch ?
- new PigServer(mPigServer.getPigContext(), false) : mPigServer;
-
GruntParser parser = new GruntParser(inputReader);
- parser.setParams(pigServer);
+ parser.setParams(mPigServer);
parser.setConsoleReader(reader);
parser.setInteractive(interactive);
+ parser.setLoadOnly(loadOnly);
+ parser.mExplain = mExplain;
- parser.parseStopOnError();
+ parser.prompt();
+ while(!parser.isDone()) {
+ parser.parse();
+ }
+
if (interactive) {
System.out.println("");
}
@@ -261,7 +404,6 @@
}
else if (key.equals("job.name"))
{
- //mPigServer.setJobName(unquote(value));
mPigServer.setJobName(value);
}
else if (key.equals("stream.skippath")) {
@@ -284,6 +426,8 @@
protected void processCat(String path) throws IOException
{
+ executeBatch();
+
try {
byte buffer[] = new byte[65536];
ElementDescriptor dfsPath = mDfs.asElement(path);
@@ -356,10 +500,10 @@
protected void processDump(String alias) throws IOException
{
- Iterator result = mPigServer.openIterator(alias);
+ Iterator<Tuple> result = mPigServer.openIterator(alias);
while (result.hasNext())
{
- Tuple t = (Tuple) result.next();
+ Tuple t = result.next();
System.out.println(t);
}
}
@@ -379,7 +523,7 @@
else
{
job.killJob();
- log.error("kill submited.");
+ log.error("kill submitted.");
}
}
}
@@ -458,6 +602,8 @@
protected void processMove(String src, String dst) throws IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mDfs.asElement(src);
ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -475,6 +621,8 @@
protected void processCopy(String src, String dst) throws IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mDfs.asElement(src);
ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -488,6 +636,8 @@
protected void processCopyToLocal(String src, String dst) throws IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mDfs.asElement(src);
ElementDescriptor dstPath = mLfs.asElement(dst);
@@ -501,6 +651,8 @@
protected void processCopyFromLocal(String src, String dst) throws IOException
{
+ executeBatch();
+
try {
ElementDescriptor srcPath = mLfs.asElement(src);
ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -521,17 +673,23 @@
protected void processPig(String cmd) throws IOException
{
int start = 1;
- if (!mInteractive)
+ if (!mInteractive) {
start = getLineNumber();
- if (cmd.charAt(cmd.length() - 1) != ';')
- mPigServer.registerQuery(cmd + ";", start);
- else
+ }
+
+ if (cmd.charAt(cmd.length() - 1) != ';') {
+ mPigServer.registerQuery(cmd + ";", start);
+ }
+ else {
mPigServer.registerQuery(cmd, start);
+ }
}
protected void processRemove(String path, String options ) throws IOException
{
ElementDescriptor dfsPath = mDfs.asElement(path);
+
+ executeBatch();
if (!dfsPath.exists()) {
if (options == null || !options.equalsIgnoreCase("force")) {
@@ -544,11 +702,35 @@
}
}
+ private class ExplainState {
+ public long mTime;
+ public int mCount;
+ public String mAlias;
+ public String mTarget;
+ public String mScript;
+ public boolean mVerbose;
+ public String mFormat;
+ public boolean mLast;
+
+ public ExplainState(String alias, String target, String script,
+ boolean verbose, String format) {
+ mTime = new Date().getTime();
+ mCount = 0;
+ mAlias = alias;
+ mTarget = target;
+ mScript = script;
+ mVerbose = verbose;
+ mFormat = format;
+ mLast = false;
+ }
+ }
+
private PigServer mPigServer;
private DataStorage mDfs;
private DataStorage mLfs;
private Properties mConf;
private JobClient mJobClient;
private boolean mDone;
-
+ private boolean mLoadOnly;
+ private ExplainState mExplain;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Thu Apr 23 16:57:16 2009
@@ -31,6 +31,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Stack;
+import java.util.List;
import java.util.ArrayList;
import jline.ConsoleReader;
@@ -65,7 +66,7 @@
abstract protected void processDescribe(String alias) throws IOException;
- abstract protected void processExplain(String alias) throws IOException;
+ abstract protected void processExplain(String alias, String script, boolean isVerbose, String format, String target, List<String> params, List<String> files) throws IOException, ParseException;
abstract protected void processRegister(String jar) throws IOException;
@@ -101,7 +102,7 @@
abstract protected void processIllustrate(String alias) throws IOException;
- abstract protected void processScript(String script, boolean batch, ArrayList<String> params, ArrayList<String> files) throws IOException, ParseException;
+ abstract protected void processScript(String script, boolean batch, List<String> params, List<String> files) throws IOException, ParseException;
static String unquote(String s)
{
@@ -152,6 +153,10 @@
TOKEN: {<EXEC: "exec">}
TOKEN: {<PARAM: "-param">}
TOKEN: {<PARAM_FILE: "-param_file">}
+TOKEN: {<SCRIPT: "-script">}
+TOKEN: {<DOT: "-dot">}
+TOKEN: {<OUT: "-out">}
+TOKEN: {<BRIEF: "-brief">}
// internal use commands
TOKEN: {<SCRIPT_DONE: "scriptDone">}
@@ -409,9 +414,7 @@
<ALIASES>
{printAliases();}
|
- <EXPLAIN>
- t1 = <IDENTIFIER>
- {processExplain(t1.image);}
+ Explain()
|
<HELP>
{printHelp();}
@@ -486,10 +489,58 @@
)
}
+void Explain() throws IOException:
+{
+ Token t;
+ String alias = null;
+ String script = null;
+ String format="text";
+ String target=null;
+ boolean isVerbose = true;
+ ArrayList<String> params;
+ ArrayList<String> files;
+
+}
+{
+ <EXPLAIN>
+ {
+ params = new ArrayList<String>();
+ files = new ArrayList<String>();
+ }
+ (
+ <BRIEF>
+ {isVerbose = false;}
+ |
+ <DOT>
+ {format = "dot";}
+ |
+ <OUT>
+ t = GetPath()
+ {target = t.image;}
+ |
+ <SCRIPT>
+ t = GetPath()
+ {script = t.image;}
+ |
+ <PARAM>
+ t = GetPath()
+ {params.add(t.image);}
+ |
+ <PARAM_FILE>
+ t = GetPath()
+ {files.add(t.image);}
+ )*
+ (
+ t = <IDENTIFIER>
+ {alias = t.image;}
+ )?
+ {processExplain(alias, script, isVerbose, format, target, params, files);}
+}
+
void Script() throws IOException:
{
Token t;
- String script;
+ String script = null;
boolean batch = false;
ArrayList<String> params;
ArrayList<String> files;
@@ -515,8 +566,10 @@
t = GetPath()
{files.add(t.image);}
)*
- t = GetPath()
- {script = t.image;}
+ (
+ t = GetPath()
+ {script = t.image;}
+ )?
{processScript(script, batch, params, files);}
}
@@ -608,6 +661,14 @@
|
t = <EXEC>
|
+ t = <OUT>
+ |
+ t = <SCRIPT>
+ |
+ t = <DOT>
+ |
+ t = <BRIEF>
+ |
t = <PARAM>
|
t = <PARAM_FILE>
Modified: hadoop/pig/trunk/test/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/MiniCluster.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/MiniCluster.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/MiniCluster.java Thu Apr 23 16:57:16 2009
@@ -105,6 +105,10 @@
public Properties getProperties() {
return ConfigurationUtil.toProperties(m_conf);
}
+
+ public void setProperty(String name, String value) {
+ m_conf.set(name, value);
+ }
public FileSystem getFileSystem() {
return m_fileSys;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/RangeSlicer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/RangeSlicer.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/RangeSlicer.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/RangeSlicer.java Thu Apr 23 16:57:16 2009
@@ -39,16 +39,19 @@
public class RangeSlicer
implements Slicer, LoadFunc
{
+ int numslices = 0;
+
+ public RangeSlicer(String num) {
+ numslices = Integer.parseInt(num);
+ }
/**
- * Expects location to be a Stringified integer, and makes
- * Integer.parseInt(location) slices. Each slice generates a single value,
+ * Each slice generates a single value,
* its index in the sequence of slices.
*/
public Slice[] slice (DataStorage store, String location)
throws IOException
{
- int numslices = Integer.parseInt(location);
Slice[] slices = new Slice[numslices];
for (int i = 0; i < slices.length; i++) {
slices[i] = new SingleValueSlice(i);
@@ -57,10 +60,8 @@
}
public void validate(DataStorage store, String location) throws IOException {
- try {
- Integer.parseInt(location);
- } catch (NumberFormatException nfe) {
- throw new IOException(nfe.getMessage());
+ if (!location.matches(".*/tmp/foo.*")) {
+ throw new IOException("Wrong Path");
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Thu Apr 23 16:57:16 2009
@@ -36,6 +36,8 @@
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+
public class TestCombiner extends TestCase {
@@ -52,7 +54,9 @@
@Test
public void testLocal() throws Exception {
// run the test locally
+ FileLocalizer.deleteTempFiles();
runTest(new PigServer(ExecType.LOCAL, new Properties()));
+ FileLocalizer.deleteTempFiles();
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java Thu Apr 23 16:57:16 2009
@@ -77,9 +77,9 @@
if(execType == ExecType.LOCAL)
return;
int numvals = 50;
- String query = "vals = foreach (group (load '"
+ String query = "vals = foreach (group (load '/tmp/foo"
+ numvals
- + "'using org.apache.pig.test.RangeSlicer()) all) generate COUNT($1);";
+ + "'using org.apache.pig.test.RangeSlicer('50')) all) generate COUNT($1);";
pigServer.registerQuery(query);
Iterator<Tuple> it = pigServer.openIterator("vals");
Tuple cur = it.next();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestFilterOpString.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFilterOpString.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestFilterOpString.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestFilterOpString.java Thu Apr 23 16:57:16 2009
@@ -32,6 +32,8 @@
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+
import junit.framework.TestCase;
public class TestFilterOpString extends TestCase {
@@ -46,6 +48,7 @@
@Before
@Override
protected void setUp() throws Exception {
+ FileLocalizer.deleteTempFiles();
pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Thu Apr 23 16:57:16 2009
@@ -42,6 +42,7 @@
public TestGrunt(String name) {
super(name);
+ cluster.setProperty("opt.multiquery","true");
basedir = "test/org/apache/pig/test/data";
}
@@ -62,7 +63,7 @@
@Test
public void testDefine() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "define myudf org.apache.pig.builtin.AVG();\n";
@@ -82,7 +83,7 @@
@Test
public void testBagSchema() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1' as (b: bag{t(i: int, c:chararray, f: float)});\n";
@@ -97,7 +98,7 @@
@Test
public void testBagSchemaFail() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'as (b: bag{t(i: int, c:chararray, f: float)});\n";
@@ -118,7 +119,7 @@
@Test
public void testBagConstant() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; b = foreach a generate {(1, '1', 0.4f),(2, '2', 0.45)};\n";
@@ -133,7 +134,7 @@
@Test
public void testBagConstantWithSchema() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; b = foreach a generate {(1, '1', 0.4f),(2, '2', 0.45)} as b: bag{t(i: int, c:chararray, d: double)};\n";
@@ -148,7 +149,7 @@
@Test
public void testBagConstantInForeachBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; b = foreach a {generate {(1, '1', 0.4f),(2, '2', 0.45)};};\n";
@@ -163,7 +164,7 @@
@Test
public void testBagConstantWithSchemaInForeachBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; b = foreach a {generate {(1, '1', 0.4f),(2, '2', 0.45)} as b: bag{t(i: int, c:chararray, d: double)};};\n";
@@ -178,7 +179,7 @@
@Test
public void testParsingAsInForeachBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); b = group a by foo; c = foreach b {generate SUM(a.fast) as fast;};\n";
@@ -193,7 +194,7 @@
@Test
public void testParsingAsInForeachWithOutBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); b = group a by foo; c = foreach b generate SUM(a.fast) as fast;\n";
@@ -208,7 +209,7 @@
@Test
public void testParsingWordWithAsInForeachBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); b = group a by foo; c = foreach b {generate SUM(a.fast);};\n";
@@ -223,7 +224,7 @@
@Test
public void testParsingWordWithAsInForeachWithOutBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); b = group a by foo; c = foreach b generate SUM(a.fast);\n";
@@ -238,7 +239,7 @@
@Test
public void testParsingWordWithAsInForeachWithOutBlock2() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "cash = load 'foo' as (foo, fast); b = foreach cash generate fast * 2.0;\n";
@@ -254,7 +255,7 @@
@Test
public void testParsingGenerateInForeachBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); b = group a by foo; c = foreach b {generate a.regenerate;};\n";
@@ -269,7 +270,7 @@
@Test
public void testParsingGenerateInForeachWithOutBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); b = group a by foo; c = foreach b generate a.regenerate;\n";
@@ -284,7 +285,7 @@
@Test
public void testParsingAsGenerateInForeachBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); b = group a by foo; c = foreach b {generate {(1, '1', 0.4f),(2, '2', 0.45)} as b: bag{t(i: int, cease:chararray, degenerate: double)}, SUM(a.fast) as fast, a.regenerate as degenerated;};\n";
@@ -299,7 +300,7 @@
@Test
public void testParsingAsGenerateInForeachWithOutBlock() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); b = group a by foo; c = foreach b generate {(1, '1', 0.4f),(2, '2', 0.45)} as b: bag{t(i: int, cease:chararray, degenerate: double)}, SUM(a.fast) as fast, a.regenerate as degenerated;\n";
@@ -314,7 +315,7 @@
@Test
public void testRunStatment() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate);" +
@@ -331,13 +332,13 @@
@Test
public void testExecStatment() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
boolean caught = false;
String strCmd = "a = load 'foo' as (foo, fast, regenerate);" +
" exec -param LIMIT=5 -param FUNCTION=COUNT " +
- "-param FILE=foo " + basedir + "/testsub.pig; explain bar";
+ "-param FILE=foo " + basedir + "/testsub.pig; explain bar;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
@@ -355,7 +356,7 @@
@Test
public void testRunStatmentNested() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); run "
@@ -371,7 +372,7 @@
@Test
public void testExecStatmentNested() throws Throwable {
- PigServer server = new PigServer("MAPREDUCE");
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
PigContext context = server.getPigContext();
boolean caught = false;
@@ -391,4 +392,155 @@
}
assertTrue(caught);
}
+
+ @Test
+ public void testExplainEmpty() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "a = load 'foo' as (foo, fast, regenerate); run "
+ +basedir+"/testsubnested_run.pig; explain";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testExplainScript() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -script "
+ +basedir+"/testsubnested_run.pig;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testExplainBrief() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -brief -script "
+ +basedir+"/testsubnested_run.pig;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testExplainDot() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -dot -script "
+ +basedir+"/testsubnested_run.pig;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testExplainOut() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -out /tmp -script "
+ +basedir+"/testsubnested_run.pig;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testPartialExecution() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "rmf bar; rmf baz; a = load 'file:test/org/apache/pig/test/data/passwd';"
+ +"store a into 'bar'; exec; a = load 'bar'; store a into 'baz';\n";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testFileCmds() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd =
+ "rmf bar; rmf baz;"
+ +"a = load 'file:test/org/apache/pig/test/data/passwd';"
+ +"store a into 'bar';"
+ +"cp bar baz;"
+ +"rm bar; rm baz;"
+ +"store a into 'baz';"
+ +"store a into 'bar';"
+ +"rm baz; rm bar;"
+ +"store a into 'baz';"
+ +"mv baz bar;"
+ +"b = load 'bar';"
+ +"store b into 'baz';"
+ +"cat baz;"
+ +"rm baz;"
+ +"rm bar;\n";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
+ @Test
+ public void testCD() throws Throwable {
+ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd =
+ "mkdir /tmp;"
+ +"mkdir /tmp/foo;"
+ +"cd /tmp;"
+ +"rmf bar; rmf foo/baz;"
+ +"copyFromLocal test/org/apache/pig/test/data/passwd bar;"
+ +"a = load 'bar';"
+ +"cd foo;"
+ +"store a into 'baz';"
+ +"cd /;"
+ +"rm /tmp/bar; rm /tmp/foo/baz;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Thu Apr 23 16:57:16 2009
@@ -122,6 +122,37 @@
@Test
public void testLoadFromHBase() throws IOException, ExecException {
prepareTable();
+ pig.registerQuery("a = load 'hbase://" + TESTTABLE + "' using " +
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A +
+ " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while(it.hasNext()){
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase "+ t);
+ String col_a = ((DataByteArray)t.get(0)).toString();
+ int col_b = (Integer)t.get(1);
+ String col_c = ((DataByteArray)t.get(2)).toString();
+
+ assertEquals(String.valueOf(count), col_a);
+ assertEquals(count, col_b);
+ assertEquals("TEXT" + count, col_c);
+
+ count++;
+ }
+ assertEquals(TEST_ROW_COUNT, count);
+ System.err.println("LoadFromHBase done");
+ }
+
+ /**
+ * load from hbase test w/o hbase:// prefix
+ * @throws IOException
+ * @throws ExecException
+ */
+ @Test
+ public void testBackwardsCompatibility() throws IOException, ExecException {
+ prepareTable();
pig.registerQuery("a = load '" + TESTTABLE + "' using " +
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A +
" " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Thu Apr 23 16:57:16 2009
@@ -468,11 +468,11 @@
ExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
- JobControlCompiler jcc = new JobControlCompiler();
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
try {
- jcc.compile(mrPlan, "Test", conf, pc);
+ jcc.compile(mrPlan, "Test");
} catch (JobCreationException jce) {
- assertTrue(jce.getErrorCode() == 1068);
+ assertTrue(jce.getErrorCode() == 1068);
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java Thu Apr 23 16:57:16 2009
@@ -17,10 +17,15 @@
*/
package org.apache.pig.test;
+import java.util.*;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
@@ -31,11 +36,22 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -45,15 +61,20 @@
POLoad ld;
PigContext pc;
DataBag inpDB;
+ String curDir;
+ String inpDir;
+ PigServer pig;
static MiniCluster cluster = MiniCluster.buildCluster();
@Before
public void setUp() throws Exception {
- String curDir = System.getProperty("user.dir");
- String inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
+ curDir = System.getProperty("user.dir");
+ inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
inpFSpec = new FileSpec("file:" + inpDir + "passwd", new FuncSpec(PigStorage.class.getName(), new String[]{":"}));
- pc = new PigContext();
- pc.connect();
+
+ FileLocalizer.deleteTempFiles();
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pc = pig.getPigContext();
ld = GenPhyOp.topLoadOp();
ld.setLFile(inpFSpec);
@@ -89,4 +110,82 @@
assertEquals(true, size==inpDB.size());
}
+ @Test
+ public void testLoadLocalRel() throws Exception {
+ checkLoadPath("file:test/org/apache/pig/test/data/passwd", "", true);
+ }
+
+ @Test
+ public void testLoadLocalAbs() throws Exception {
+ checkLoadPath("file:"+curDir + File.separatorChar+"test/org/apache/pig/test/data/passwd", "", true);
+ }
+
+ @Test
+ public void testLoadRemoteRel() throws Exception {
+ checkLoadPath("test","/tmp/test");
+ }
+
+ @Test
+ public void testLoadRemoteAbs() throws Exception {
+ checkLoadPath("/tmp/test","/tmp/test");
+ }
+
+ @Test
+ public void testLoadRemoteRelScheme() throws Exception {
+ checkLoadPath("test","/tmp/test");
+ }
+
+ @Test
+ public void testLoadRemoteAbsScheme() throws Exception {
+ checkLoadPath("hdfs:/tmp/test","/tmp/test");
+ }
+
+ @Test
+ public void testLoadRemoteAbsAuth() throws Exception {
+ checkLoadPath("hdfs://localhost:9000/test","/test");
+ }
+
+ @Test
+ public void testLoadRemoteNormalize() throws Exception {
+ checkLoadPath("/tmp/foo/../././","/tmp");
+ }
+
+ private void checkLoadPath(String orig, String expected) throws Exception {
+ checkLoadPath(orig, expected, false);
+ }
+
+ private void checkLoadPath(String orig, String expected, boolean isTmp) throws Exception {
+ pc.getProperties().setProperty("opt.multiquery",""+true);
+
+ DataStorage dfs = pc.getDfs();
+ dfs.setActiveContainer(dfs.asContainer("/tmp"));
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+ Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
+ Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+ Map<String, String> fileNameMap = new HashMap<String, String>();
+
+ LogicalPlanBuilder builder = new LogicalPlanBuilder(pc);
+
+ String query = "a = load '"+orig+"';";
+ LogicalPlan lp = builder.parse("Test-Load",
+ query,
+ aliases,
+ logicalOpTable,
+ aliasOp,
+ fileNameMap);
+ Assert.assertTrue(lp.size()>0);
+ LogicalOperator op = lp.getRoots().get(0);
+
+ Assert.assertTrue(op instanceof LOLoad);
+ LOLoad load = (LOLoad)op;
+
+ String p = load.getInputFile().getFileName();
+ p = p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/","/");
+
+ if (isTmp) {
+ Assert.assertTrue(p.matches("/tmp.*"));
+ } else {
+ Assert.assertEquals(p, expected);
+ }
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalJobSubmission.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalJobSubmission.java Thu Apr 23 16:57:16 2009
@@ -32,7 +32,9 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -130,7 +132,7 @@
rmrf(outDir);
}
- private void generateInput(int numTuples) throws ExecException{
+ private void generateInput(int numTuples) throws Exception{
DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
@@ -146,13 +148,18 @@
inps.add(proj);
POStore str = new POStore(new OperatorKey("", r.nextLong()));
- str.setInputs(inps);
FileSpec fSpec = new FileSpec(ldFile, new FuncSpec(PigStorage.class.getName()));
str.setSFile(fSpec);
- str.setPc(pc);
- str.store();
+ str.setStoreImpl(new LocalPOStoreImpl(pc));
+
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(proj);
+ pp.add(str);
+ pp.connect(proj,str);
+
+ new LocalPigLauncher().launchPig(pp, "TestLocalJobSubmission", pc);
}
/*private void setUp1(boolean gen) throws Exception {
@@ -389,7 +396,6 @@
POStore st = new POStore(new OperatorKey("", r.nextLong()));
ld.setPc(pc);
ld.setLFile(LFSpec);
- st.setPc(pc);
st.setSFile(SFSpec);
Tuple sample = new DefaultTuple();
@@ -449,7 +455,7 @@
private void submit() throws Exception{
assertEquals(true, FileLocalizer.fileExists(ldFile, pc));
- LocalLauncher ll = new LocalLauncher();
+ MapReduceLauncher ll = new MapReduceLauncher();
ll.launchPig(php, grpName, pc);
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java Thu Apr 23 16:57:16 2009
@@ -154,7 +154,7 @@
try {
LogicalPlan lp = builder.parse("Test-Plan-Builder", query, aliases,
- logicalOpTable, aliasOp);
+ logicalOpTable, aliasOp, fileNameMap);
List<LogicalOperator> roots = lp.getRoots();
if (roots.size() > 0) {
@@ -241,4 +241,5 @@
Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+ Map<String, String> fileNameMap = new HashMap<String, String>();
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java Thu Apr 23 16:57:16 2009
@@ -69,7 +69,7 @@
Random r = new Random();
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
-
+
private boolean generate = false;
@@ -114,6 +114,7 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld");
@@ -125,6 +126,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -143,7 +145,8 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
-
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
+
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Sort.gld");
fos.write(baos.toByteArray());
@@ -154,7 +157,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -174,6 +177,7 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Distinct.gld");
@@ -185,7 +189,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -206,6 +210,7 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Cogroup.gld");
@@ -217,7 +222,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -242,7 +247,8 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
-
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
+
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld");
fos.write(baos.toByteArray());
@@ -253,7 +259,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -275,6 +281,7 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Comparison.gld");
@@ -286,7 +293,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -308,7 +315,8 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
-
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
+
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/BinCond.gld");
fos.write(baos.toByteArray());
@@ -319,7 +327,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -342,7 +350,8 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
-
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
+
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Generate.gld");
fos.write(baos.toByteArray());
@@ -353,7 +362,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -373,7 +382,8 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
-
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
+
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Union.gld");
fos.write(baos.toByteArray());
@@ -384,7 +394,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -405,7 +415,8 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
-
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
+
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Split1.gld");
fos.write(baos.toByteArray());
@@ -421,7 +432,8 @@
//System.out.println("Length of first plan = " + len + " of second = " + test);
String goldenPlan1 = new String(b1, 0, len);
String goldenPlan2 = new String(b2, 0, len);
-
+ goldenPlan1 = goldenPlan1.replaceAll("Load(.*)","Load()");
+ goldenPlan2 = goldenPlan2.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -458,7 +470,8 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
-
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
+
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/IsNull1.gld");
fos.write(baos.toByteArray());
@@ -474,7 +487,8 @@
//System.out.println("Length of first plan = " + len + " of second = " + test + " Length of compiled plan = " + compiledPlan.length());
String goldenPlan1 = new String(b1, 0, len);
String goldenPlan2 = new String(b2, 0, len);
-
+ goldenPlan1 = goldenPlan1.replaceAll("Load(.*)","Load()");
+ goldenPlan2 = goldenPlan2.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -507,6 +521,7 @@
pp.explain(baos);
baos.write((int)'\n');
String compiledPlan = baos.toString();
+ compiledPlan = compiledPlan.replaceAll("Load(.*)","Load()");
if(generate){
FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Limit.gld");
@@ -518,7 +533,7 @@
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
String goldenPlan = new String(b, 0, len);
-
+ goldenPlan = goldenPlan.replaceAll("Load(.*)","Load()");
System.out.println();
System.out.println(compiledPlan);
@@ -622,14 +637,19 @@
public LogicalPlan buildPlan(String query, ClassLoader cldr) {
LogicalPlanBuilder.classloader = cldr;
PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
- LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
-
try {
+
+ pigContext.connect();
+
+ LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
+
+
LogicalPlan lp = builder.parse("Test-Plan-Builder",
query,
aliases,
logicalOpTable,
- aliasOp);
+ aliasOp,
+ fileNameMap);
List<LogicalOperator> roots = lp.getRoots();
@@ -679,4 +699,5 @@
Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+ Map<String, String> fileNameMap = new HashMap<String, String>();
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Apr 23 16:57:16 2009
@@ -1954,14 +1954,17 @@
public LogicalPlan buildPlan(String query, ClassLoader cldr) {
LogicalPlanBuilder.classloader = cldr;
- LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
try {
+ pigContext.connect();
+ LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
+
LogicalPlan lp = builder.parse("Test-Plan-Builder",
query,
aliases,
logicalOpTable,
- aliasOp);
+ aliasOp,
+ fileNameMap);
List<LogicalOperator> roots = lp.getRoots();
if(roots.size() > 0) {
@@ -1995,5 +1998,6 @@
Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+ Map<String, String> fileNameMap = new HashMap<String, String>();
PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Thu Apr 23 16:57:16 2009
@@ -98,7 +98,7 @@
// WILL OVERWRITE THE GOLDEN FILES - So use this
// with caution and only for the testcases you need
// and are sure of
- private boolean generate = true;
+ private boolean generate = false;
@Before
public void setUp() throws ExecException {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigScriptParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigScriptParser.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigScriptParser.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigScriptParser.java Thu Apr 23 16:57:16 2009
@@ -46,58 +46,63 @@
Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>() ;
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>() ;
+ Map<String, String> fileNameMap = new HashMap<String, String>();
PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties()) ;
+ pigContext.connect();
String tempFile = this.prepareTempFile() ;
// Start the real parsing job
{
+
// Initial statement
String query = String.format("A = LOAD '%s' ;", Util.encodeEscape(tempFile)) ;
ByteArrayInputStream in = new ByteArrayInputStream(query.getBytes());
- QueryParser parser = new QueryParser(in, pigContext, "scope", aliases, opTable, aliasOp) ;
+ QueryParser parser = new QueryParser(in, pigContext, "scope", aliases, opTable, aliasOp, fileNameMap) ;
LogicalPlan lp = parser.Parse() ;
}
{
// Normal condition
String query = "B1 = filter A by $0 eq 'This is a test string' ;" ;
- checkParsedConstContent(aliases, opTable, pigContext, aliasOp,
+ checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
query, "This is a test string") ;
}
{
// single-quote condition
String query = "B2 = filter A by $0 eq 'This is a test \\'string' ;" ;
- checkParsedConstContent(aliases, opTable, pigContext, aliasOp,
+ checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
query, "This is a test 'string") ;
}
{
// newline condition
String query = "B3 = filter A by $0 eq 'This is a test \\nstring' ;" ;
- checkParsedConstContent(aliases, opTable, pigContext, aliasOp,
+ checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
query, "This is a test \nstring") ;
}
{
// Unicode
String query = "B4 = filter A by $0 eq 'This is a test \\uD30C\\uC774string' ;" ;
- checkParsedConstContent(aliases, opTable, pigContext, aliasOp,
+ checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
query, "This is a test \uD30C\uC774string") ;
}
}
private void checkParsedConstContent(Map<LogicalOperator, LogicalPlan> aliases,
- Map<OperatorKey, LogicalOperator> opTable,
- PigContext pigContext,
- Map<String, LogicalOperator> aliasOp,
- String query,
- String expectedContent)
+ Map<OperatorKey, LogicalOperator> opTable,
+ PigContext pigContext,
+ Map<String, LogicalOperator> aliasOp,
+ Map<String, String> fileNameMap,
+ String query,
+ String expectedContent)
throws Exception {
// Run the parser
ByteArrayInputStream in = new ByteArrayInputStream(query.getBytes());
- QueryParser parser = new QueryParser(in, pigContext, "scope", aliases, opTable, aliasOp) ;
+ QueryParser parser = new QueryParser(in, pigContext, "scope", aliases, opTable, aliasOp,
+ fileNameMap) ;
LogicalPlan lp = parser.Parse() ;
// Digging down the tree
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Thu Apr 23 16:57:16 2009
@@ -19,6 +19,10 @@
import static org.junit.Assert.assertEquals;
+import java.util.*;
+
+import org.apache.pig.ExecType;
+
import java.io.File;
import java.io.BufferedReader;
import java.io.FileReader;
@@ -27,6 +31,9 @@
import java.util.List;
import java.util.Random;
+import junit.framework.Assert;
+import org.apache.pig.impl.plan.OperatorKey;
+
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
@@ -37,16 +44,29 @@
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.PigServer;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -58,6 +78,7 @@
static MiniCluster cluster = MiniCluster.buildCluster();
PigContext pc;
POProject proj;
+ PigServer pig;
@Before
public void setUp() throws Exception {
@@ -65,32 +86,39 @@
fSpec = new FileSpec("file:/tmp/storeTest.txt",
new FuncSpec(PigStorage.class.getName(), new String[]{":"}));
st.setSFile(fSpec);
- pc = new PigContext();
- pc.connect();
- st.setPc(pc);
+
+ FileLocalizer.deleteTempFiles();
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pc = pig.getPigContext();
+
+ st.setStoreImpl(new LocalPOStoreImpl(pc));
proj = GenPhyOp.exprProject();
proj.setColumn(0);
proj.setResultType(DataType.TUPLE);
proj.setOverloaded(true);
List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
- inps.add(proj);
- st.setInputs(inps);
-
}
@After
public void tearDown() throws Exception {
}
+ private boolean store() throws Exception {
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(proj);
+ pp.add(st);
+ pp.connect(proj, st);
+ return new LocalPigLauncher().launchPig(pp, "TestStore", pc);
+ }
+
@Test
- public void testStore() throws ExecException, IOException {
+ public void testStore() throws Exception {
inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- Result res = st.store();
- assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ assertTrue(store());
int size = 0;
BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
@@ -112,13 +140,12 @@
}
@Test
- public void testStoreComplexData() throws ExecException, IOException {
+ public void testStoreComplexData() throws Exception {
inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- Result res = st.store();
- assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ assertTrue(store());
PigStorage ps = new PigStorage(":");
int size = 0;
@@ -144,15 +171,14 @@
}
@Test
- public void testStoreComplexDataWithNull() throws ExecException, IOException {
+ public void testStoreComplexDataWithNull() throws Exception {
Tuple inputTuple = GenRandomData.genRandSmallBagTextTupleWithNulls(new Random(), 10, 100);
inpDB = DefaultBagFactory.getInstance().newDefaultBag();
inpDB.add(inputTuple);
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- Result res = st.store();
- assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ assertTrue(store());
PigStorage ps = new PigStorage(":");
int size = 0;
@@ -179,4 +205,80 @@
FileLocalizer.delete(fSpec.getFileName(), pc);
}
+ @Test
+ public void testStoreRemoteRel() throws Exception {
+ checkStorePath("test","/tmp/test");
+ }
+
+ @Test
+ public void testStoreRemoteAbs() throws Exception {
+ checkStorePath("/tmp/test","/tmp/test");
+ }
+
+ @Test
+ public void testStoreRemoteRelScheme() throws Exception {
+ checkStorePath("test","/tmp/test");
+ }
+
+ @Test
+ public void testStoreRemoteAbsScheme() throws Exception {
+ checkStorePath("hdfs:/tmp/test","/tmp/test");
+ }
+
+ @Test
+ public void testStoreRemoteAbsAuth() throws Exception {
+ checkStorePath("hdfs://localhost:9000/test","/test");
+ }
+
+ @Test
+ public void testStoreRemoteNormalize() throws Exception {
+ checkStorePath("/tmp/foo/../././","/tmp");
+ }
+
+ private void checkStorePath(String orig, String expected) throws Exception {
+ checkStorePath(orig, expected, false);
+ }
+
+ private void checkStorePath(String orig, String expected, boolean isTmp) throws Exception {
+ pc.getProperties().setProperty("opt.multiquery",""+true);
+
+ DataStorage dfs = pc.getDfs();
+ dfs.setActiveContainer(dfs.asContainer("/tmp"));
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+ Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
+ Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+ Map<String, String> fileNameMap = new HashMap<String, String>();
+
+ LogicalPlanBuilder builder = new LogicalPlanBuilder(pc);
+
+ String query = "a = load 'foo';";
+ LogicalPlan lp = builder.parse("Test-Store",
+ query,
+ aliases,
+ logicalOpTable,
+ aliasOp,
+ fileNameMap);
+ query = "store a into '"+orig+"';";
+ lp = builder.parse("Test-Store",
+ query,
+ aliases,
+ logicalOpTable,
+ aliasOp,
+ fileNameMap);
+
+ Assert.assertTrue(lp.size()>1);
+ LogicalOperator op = lp.getLeaves().get(0);
+
+ Assert.assertTrue(op instanceof LOStore);
+ LOStore store = (LOStore)op;
+
+ String p = store.getOutputFile().getFileName();
+ p = p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/","/");
+
+ if (isTmp) {
+ Assert.assertTrue(p.matches("/tmp.*"));
+ } else {
+ Assert.assertEquals(p, expected);
+ }
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java Thu Apr 23 16:57:16 2009
@@ -35,6 +35,7 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -193,6 +194,7 @@
Util.createInputFile(cluster, "a.txt", new String[] {"1\t2\t3", "4\t5\t6"});
Util.createInputFile(cluster, "b.txt", new String[] {"7\t8\t9", "1\t200\t300"});
Util.createInputFile(cluster, "c.txt", new String[] {"1\t20\t30"});
+ FileLocalizer.deleteTempFiles();
PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pig.registerQuery("a = load 'a.txt' ;");
pig.registerQuery("b = load 'b.txt';");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld Thu Apr 23 16:57:16 2009
@@ -1,17 +1,19 @@
MapReduce(-1) - -78:
-| Store(DummyFil:DummyLdr) - --7868505214447593853
+| Store(DummyFil:DummyLdr) - -5515425171581967372
| |
-| |---Filter[tuple] - --5177745552827005198
+| |---Filter[tuple] - --6061281703859425960
+| | |
+| | Constant(true) - --3396897091865664764
| |
-| |---Package[tuple]{Unknown} - --6430355428631435461
-| Local Rearrange[tuple]{Unknown}(false) - -8729990799028586395
+| |---Package[tuple]{Unknown} - --5758282087831209061
+| Local Rearrange[tuple]{Unknown}(false) - -3709512757404691843
| |
-| |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -77
+| |---Load(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -77
|
|---MapReduce(-1) - -75:
- | Store(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -76
+ | Store(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -76
| |
- | |---Package[tuple]{Unknown} - -4721502244557927278
- | Local Rearrange[tuple]{Unknown}(false) - --7681398237172009051
+ | |---Package[tuple]{Unknown} - --2057425961601007773
+ | Local Rearrange[tuple]{Unknown}(false) - --8361563503038121624
| |
- | |---Load(DummyFil:DummyLdr) - -6620645493024302760
\ No newline at end of file
+ | |---Load(DummyFil:DummyLdr) - -7506868571066332964
\ No newline at end of file
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld Thu Apr 23 16:57:16 2009
@@ -1,57 +1,63 @@
MapReduce(-1) - -11:
Reduce Plan Empty
-| Store(DummyFil:DummyLdr) - --2449990780759860228
+| Store(DummyFil:DummyLdr) - -7490898804471997380
| |
-| |---Union[tuple] - -4061122832660258194
+| |---Union[tuple] - -3720949273928245639
| |
-| |---Load(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -12
+| |---Load(file:/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -12
| |
-| |---Load(/tmp/temp-1456742965/tmp774375955:org.apache.pig.builtin.BinStorage) - -14
+| |---Load(file:/tmp/temp-1456742965/tmp774375955:org.apache.pig.builtin.BinStorage) - -14
|
|---MapReduce(-1) - -2:
-| | Store(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -13
+| | Store(file:/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -13
| | |
-| | |---Filter[tuple] - --171729478481529687
+| | |---Filter[tuple] - --2449990780759860228
+| | | |
+| | | Constant(true) - --8248982303554009
| | |
-| | |---Package[tuple]{Unknown} - -3527883492192621891
+| | |---Package[tuple]{Unknown} - -4061122832660258194
| | Union[tuple] - -3
| | |
-| | |---Local Rearrange[tuple]{Unknown}(false) - --2655303127943013956
+| | |---Local Rearrange[tuple]{Unknown}(false) - -3527883492192621891
| | | |
-| | | |---Load(DummyFil:DummyLdr) - --3833933141637499382
+| | | |---Load(DummyFil:DummyLdr) - --6402314745592504008
| | |
-| | |---Local Rearrange[tuple]{Unknown}(false) - -7473175511145418837
+| | |---Local Rearrange[tuple]{Unknown}(false) - -8637487025682524492
| | |
-| | |---Filter[tuple] - --6402314745592504008
+| | |---Filter[tuple] - -7473175511145418837
+| | | |
+| | | Constant(true) - --2655303127943013956
| | |
-| | |---Load(DummyFil:DummyLdr) - --838807233869503381
+| | |---Load(DummyFil:DummyLdr) - --3833933141637499382
|
|---MapReduce(-1) - -6:
- | Store(/tmp/temp-1456742965/tmp774375955:org.apache.pig.builtin.BinStorage) - -15
+ | Store(file:/tmp/temp-1456742965/tmp774375955:org.apache.pig.builtin.BinStorage) - -15
| |
- | |---Package[tuple]{Unknown} - -990040854696137546
- | Load(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -7
+ | |---Package[tuple]{Unknown} - -5679595123645092366
+ | Load(file:/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -7
|
- | Load(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -9
+ | Load(file:/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -9
|
|---MapReduce(30) - -4:
- | | Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -8
+ | | Store(file:/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -8
| | |
- | | |---Local Rearrange[tuple]{Unknown}(false) - --5623550231721294978
+ | | |---Local Rearrange[tuple]{Unknown}(false) - --8216215966586363937
| | |
- | | |---Package[tuple]{Unknown} - --6259721534861268730
- | | Local Rearrange[tuple]{Unknown}(false) - --7212359720440714287
+ | | |---Package[tuple]{Unknown} - --7212359720440714287
+ | | Local Rearrange[tuple]{Unknown}(false) - -7469509242284658386
| | |
- | | |---Load(DummyFil:DummyLdr) - -6748240903696823165
+ | | |---Load(DummyFil:DummyLdr) - -990040854696137546
|
|---MapReduce(20) - -5:
- | Store(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -10
+ | Store(file:/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -10
| |
- | |---Local Rearrange[tuple]{Unknown}(false) - -5679595123645092366
+ | |---Local Rearrange[tuple]{Unknown}(false) - --5623550231721294978
| |
- | |---Package[tuple]{Unknown} - -8345455294066939854
- | Local Rearrange[tuple]{Unknown}(false) - -2043312794799763441
+ | |---Package[tuple]{Unknown} - --6259721534861268730
+ | Local Rearrange[tuple]{Unknown}(false) - -3248199015665744565
| |
| |---Filter[tuple] - -6520791719738296531
+ | | |
+ | | Constant(true) - -2043312794799763441
| |
| |---Load(DummyFil:DummyLdr) - --5314747545448923824
\ No newline at end of file