You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/30 22:35:57 UTC
svn commit: r990934 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/
Author: thejas
Date: Mon Aug 30 20:35:57 2010
New Revision: 990934
URL: http://svn.apache.org/viewvc?rev=990934&view=rev
Log:
PIG-1570: native mapreduce operator MR job does not follow same failure handling logic as other pig MR jobs
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 20:35:57 2010
@@ -171,6 +171,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1570: native mapreduce operator MR job does not follow same failure handling logic as other pig MR jobs (thejas)
+
PIG-1343: pig_log file missing even though Main tells it is creating one and
an M/R job fails (nrai via rding)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Aug 30 20:35:57 2010
@@ -191,17 +191,18 @@ public class MapReduceLauncher extends L
// Initially, all jobs are in wait state.
List<Job> jobsWithoutIds = jc.getWaitingJobs();
log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for submission.");
-
+ //notify listeners about jobs submitted
ScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
- String jobTrackerAdd;
- String port;
+ // determine job tracker url
String jobTrackerLoc;
JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
try {
- port = jobConf.get("mapred.job.tracker.http.address");
- jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
- jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":")) + port.substring(port.indexOf(":"));
+ String port = jobConf.get("mapred.job.tracker.http.address");
+ String jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
+
+ jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"))
+ + port.substring(port.indexOf(":"));
}
catch(Exception e){
// Could not get the job tracker location, most probably we are running in local mode.
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java Mon Aug 30 20:35:57 2010
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.util.Arrays;
+
import org.apache.hadoop.util.RunJar;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.impl.plan.OperatorKey;
@@ -75,20 +77,32 @@ public class NativeMapReduceOper extends
RunJarSecurityManager secMan = new RunJarSecurityManager();
try {
RunJar.main(getNativeMRParams());
+ PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
} catch (SecurityException se) {
if(secMan.getExitInvoked()) {
if(secMan.getExitCode() != 0) {
throw new JobCreationException("Native job returned with non-zero return code");
}
else {
- PigStatsUtil.addNativeJobStats(PigStats.get(), this);
+ PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
}
}
} catch (Throwable t) {
- throw new JobCreationException("Cannot run native mapreduce job "+ t.getMessage());
+ JobCreationException e = new JobCreationException(
+ "Cannot run native mapreduce job "+ t.getMessage(), t);
+ PigStatsUtil.addNativeJobStats(PigStats.get(), this, false, e);
+ throw e;
} finally {
secMan.retire();
}
}
+ @Override
+ public String name(){
+ return "MapReduce - " + mKey.toString() + "\n"
+ + " Native MapReduce - jar : " + nativeMRJar + ", params: " + Arrays.toString(params) ;
+
+
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Aug 30 20:35:57 2010
@@ -43,7 +43,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon Aug 30 20:35:57 2010
@@ -294,15 +294,23 @@ public abstract class PigStatsUtil {
return js;
}
- public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr) {
+ public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
+ boolean success) {
+ return addNativeJobStats(ps, mr, success, null);
+ }
+
+ public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
+ boolean success, Exception e) {
JobStats js = ps.addJobStatsForNative(mr);
if(js == null) {
LOG.warn("unable to add native job stats");
} else {
- js.setSuccessful(true);
+ js.setSuccessful(success);
+ if(e != null)
+ js.setBackendException(e);
}
return js;
- }
+ }
private static JobStats accumulateSuccessStatistics(PigStats ps, Job job) {
JobStats js = ps.addJobStats(job);
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Mon Aug 30 20:35:57 2010
@@ -72,6 +72,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LONative;
import org.apache.pig.impl.logicalLayer.LOSort;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOStream;
@@ -491,17 +492,18 @@ public class ScriptState {
if (mro instanceof NativeMapReduceOper) {
feature.set(PIG_FEATURE.NATIVE.ordinal());
}
- try {
- new FeatureVisitor(mro.mapPlan, feature).visit();
- if (mro.reducePlan.isEmpty()) {
- feature.set(PIG_FEATURE.MAP_ONLY.ordinal());
- } else {
- new FeatureVisitor(mro.reducePlan, feature).visit();
+ else{// if it is NATIVE MR , don't explore its plans
+ try {
+ new FeatureVisitor(mro.mapPlan, feature).visit();
+ if (mro.reducePlan.isEmpty()) {
+ feature.set(PIG_FEATURE.MAP_ONLY.ordinal());
+ } else {
+ new FeatureVisitor(mro.reducePlan, feature).visit();
+ }
+ } catch (VisitorException e) {
+ LOG.warn("Feature visitor failed", e);
}
- } catch (VisitorException e) {
- LOG.warn("Feature visitor failed", e);
}
-
StringBuilder sb = new StringBuilder();
for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
if (sb.length() > 0) sb.append(",");
@@ -668,6 +670,12 @@ public class ScriptState {
protected void visit(LOUnion op) throws VisitorException {
feature.set(PIG_FEATURE.UNION.ordinal());
}
+
+ @Override
+ protected void visit(LONative n) throws VisitorException {
+ feature.set(PIG_FEATURE.NATIVE.ordinal());
+ }
+
}
private static class AliasVisitor extends PhyPlanVisitor {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Mon Aug 30 20:35:57 2010
@@ -17,43 +17,66 @@
*/
package org.apache.pig.test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.data.BagFactory;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.tools.pigstats.PigStats;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestNativeMapReduce {
- static MiniCluster cluster = MiniCluster.buildCluster();
- private PigServer pigServer;
+
+ //NOTE:
+ // Testing NativeMapReduce in LOCAL mode from unit test setup is not easy.
+ // (ie current WordCount.jar does not work as-is).
+ // the presence of ~/pigtest/conf/hadoop-site.xml created by MiniCluster
+ // in the class path makes the MR job try to contact the MiniCluster.
+ // if the MiniCluster shutdown is changed to delete the file, the other
+ // test cases fail because the file in classpath does not exist
+
// the jar has been created using the source at
// http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/WordCount.java:816822
private String jarFileName = "test//org/apache/pig/test/data/TestWordCount.jar";
private String exp_msg_prefix = "Check if expected results contains: ";
- TupleFactory mTf = TupleFactory.getInstance();
- BagFactory mBf = BagFactory.getInstance();
-
+ final static String INPUT_FILE = "TestMapReduceInputFile";
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer = null;
+
/**
* TODO - Move to runtime jar creation approach
private void createWordCountJar() {
}*/
+
+ @BeforeClass
+ public static void oneTimeSetup() throws Exception{
+ String[] input = {
+ "one",
+ "two",
+ "three",
+ "three",
+ "two",
+ "three"
+ };
+ Util.createInputFile(cluster, INPUT_FILE, input);
+ }
@Before
public void setUp() throws Exception{
- FileLocalizer.setR(new Random());
- //FileLocalizer.setInitialized(false);
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
//createWordCountJar();
@@ -61,272 +84,286 @@ public class TestNativeMapReduce {
@AfterClass
public static void oneTimeTearDown() throws Exception {
+ Util.deleteFile(cluster, INPUT_FILE);
cluster.shutDown();
}
+
// See PIG-506
@Test
public void testNativeMRJobSimple() throws Exception{
- String[] input = {
- "one",
- "two",
- "three",
- "three",
- "two",
- "three"
- };
- Util.createInputFile(cluster, "table_testNativeMRJobSimple", input);
+ try{
+ Collection<String> results = new HashSet<String>();
+ results.add("(one,1)");
+ results.add("(two,2)");
+ results.add("(three,3)");
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+ "Store A into 'table_testNativeMRJobSimple_input' "+
+ "Load 'table_testNativeMRJobSimple_output' "+
+ "`WordCount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`;");
+ pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
+ List<ExecJob> execJobs = pigServer.executeBatch();
+
+ assertEquals("num of jobs", execJobs.size(), 1);
+ boolean foundNativeFeature = false;
+ for(ExecJob job : execJobs){
+ assertEquals("job status", job.getStatus(),JOB_STATUS.COMPLETED);
+ if(job.getStatistics().getFeatures().contains("NATIVE")){
+ foundNativeFeature = true;
+ }
+ }
+ assertTrue("foundNativeFeature", foundNativeFeature);
+
+ // Check the output
+ pigServer.registerQuery("C = load 'table_testNativeMRJobSimpleDir';");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ Tuple t;
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+
+ // We have to manually delete intermediate mapreduce files
+ Util.deleteFile(cluster,"table_testNativeMRJobSimple_input");
+ Util.deleteFile(cluster,"table_testNativeMRJobSimple_output");
+
+ // check in interactive mode
+ iter = pigServer.openIterator("B");
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+ }
+ finally{
+ // We have to manually delete intermediate mapreduce files
+ Util.deleteFile(cluster,"table_testNativeMRJobSimple_input");
+ Util.deleteFile(cluster,"table_testNativeMRJobSimple_output");
+ Util.deleteFile(cluster,"table_testNativeMRJobSimpleDir");
+ }
+ }
- Collection<String> results = new HashSet<String>();
- results.add("(one,1)");
- results.add("(two,2)");
- results.add("(three,3)");
-
- pigServer.setBatchOn();
- pigServer.registerQuery("A = load 'table_testNativeMRJobSimple';");
- pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
- "Store A into 'table_testNativeMRJobSimple_input' "+
- "Load 'table_testNativeMRJobSimple_output' "+
- "`WordCount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`;");
- pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
- pigServer.executeBatch();
-
- // Check the output
- pigServer.registerQuery("C = load 'table_testNativeMRJobSimpleDir';");
-
- Iterator<Tuple> iter = pigServer.openIterator("C");
- Tuple t;
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertFalse(iter.hasNext());
-
- // We have to manually delete intermediate mapreduce files
- Util.deleteFile(cluster, "table_testNativeMRJobSimple_input");
- Util.deleteFile(cluster, "table_testNativeMRJobSimple_output");
-
- // check in interactive mode
- iter = pigServer.openIterator("B");
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertFalse(iter.hasNext());
-
- // We have to manually delete intermediate mapreduce files
- Util.deleteFile(cluster, "table_testNativeMRJobSimple_input");
- Util.deleteFile(cluster, "table_testNativeMRJobSimple_output");
-
- Util.deleteFile(cluster, "table_testNativeMRJobSimple");
- Util.deleteFile(cluster, "table_testNativeMRJobSimpleDir");
+
+ @Test
+ public void testNativeMRJobSimpleFailure() throws Exception{
+ try{
+ //test if correct return code is obtained when query fails
+ // the native MR is writing to an exisiting and should fail
+
+ Collection<String> results = new HashSet<String>();
+ results.add("(one,1)");
+ results.add("(two,2)");
+ results.add("(three,3)");
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+ "Store A into 'table_testNativeMRJobSimple_input' "+
+ "Load 'table_testNativeMRJobSimple_output' "+
+ "`WordCount table_testNativeMRJobSimple_input " + INPUT_FILE + "`;");
+ pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
+// List<ExecJob> execJobs = pigServer.executeBatch();
+
+
+ assertTrue("job failed", PigStats.get().getReturnCode() != 0);
+
+ }
+ finally{
+ // We have to manually delete intermediate mapreduce files
+ Util.deleteFile(cluster, "table_testNativeMRJobSimple_input");
+ Util.deleteFile(cluster, "table_testNativeMRJobSimpleDir");
+ }
}
+
// See PIG-506
@Test
public void testNativeMRJobMultiStoreOnPred() throws Exception{
- String[] input = {
- "one",
- "two",
- "three",
- "three",
- "two",
- "three"
- };
- Util.createInputFile(cluster, "table_testNativeMRJobMultiStoreOnPred", input);
+ try{
- Collection<String> results = new HashSet<String>();
- results.add("(one,1)");
- results.add("(two,2)");
- results.add("(three,3)");
-
- pigServer.setBatchOn();
- pigServer.registerQuery("A = load 'table_testNativeMRJobMultiStoreOnPred';");
- pigServer.registerQuery("Store A into 'testNativeMRJobMultiStoreOnPredTemp';");
- pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
- "Store A into 'table_testNativeMRJobMultiStoreOnPred_input' "+
- "Load 'table_testNativeMRJobMultiStoreOnPred_output' "+
- "`WordCount table_testNativeMRJobMultiStoreOnPred_input table_testNativeMRJobMultiStoreOnPred_output`;");
- pigServer.registerQuery("Store B into 'table_testNativeMRJobMultiStoreOnPredDir';");
- pigServer.executeBatch();
-
- // Check the output
- pigServer.registerQuery("C = load 'table_testNativeMRJobMultiStoreOnPredDir';");
-
- Iterator<Tuple> iter = pigServer.openIterator("C");
- Tuple t;
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertFalse(iter.hasNext());
-
- Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_input");
- Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_output");
-
- // check in interactive mode
- iter = pigServer.openIterator("B");
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertFalse(iter.hasNext());
-
- Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_input");
- Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_output");
-
- Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred");
- Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPredDir");
+ Collection<String> results = new HashSet<String>();
+ results.add("(one,1)");
+ results.add("(two,2)");
+ results.add("(three,3)");
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+ pigServer.registerQuery("Store A into 'testNativeMRJobMultiStoreOnPredTemp';");
+ pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+ "Store A into 'table_testNativeMRJobMultiStoreOnPred_input' "+
+ "Load 'table_testNativeMRJobMultiStoreOnPred_output' "+
+ "`WordCount table_testNativeMRJobMultiStoreOnPred_input table_testNativeMRJobMultiStoreOnPred_output`;");
+ pigServer.registerQuery("Store B into 'table_testNativeMRJobMultiStoreOnPredDir';");
+ pigServer.executeBatch();
+
+ // Check the output
+ pigServer.registerQuery("C = load 'table_testNativeMRJobMultiStoreOnPredDir';");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ Tuple t;
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_input");
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_output");
+
+ // check in interactive mode
+ iter = pigServer.openIterator("B");
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+ }
+ finally{
+ Util.deleteFile(cluster,"testNativeMRJobMultiStoreOnPredTemp");
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_input");
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_output");
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPredDir");
+ }
}
// See PIG-506
@Test
public void testNativeMRJobMultiQueryOpt() throws Exception{
- String[] input = {
- "one",
- "two",
- "three",
- "three",
- "two",
- "three"
- };
- Util.createInputFile(cluster, "table_testNativeMRJobMultiQueryOpt", input);
-
- Collection<String> results = new HashSet<String>();
- results.add("(one,1)");
- results.add("(two,2)");
- results.add("(three,3)");
-
- pigServer.registerQuery("A = load 'table_testNativeMRJobMultiQueryOpt';");
- pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
- "Store A into 'table_testNativeMRJobMultiQueryOpt_inputB' "+
- "Load 'table_testNativeMRJobMultiQueryOpt_outputB' "+
- "`WordCount table_testNativeMRJobMultiQueryOpt_inputB table_testNativeMRJobMultiQueryOpt_outputB`;");
- pigServer.registerQuery("C = mapreduce '" + jarFileName + "' " +
- "Store A into 'table_testNativeMRJobMultiQueryOpt_inputC' "+
- "Load 'table_testNativeMRJobMultiQueryOpt_outputC' "+
- "`WordCount table_testNativeMRJobMultiQueryOpt_inputC table_testNativeMRJobMultiQueryOpt_outputC`;");
-
- Iterator<Tuple> iter = pigServer.openIterator("C");
- Tuple t;
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertFalse(iter.hasNext());
-
- iter = pigServer.openIterator("B");
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertFalse(iter.hasNext());
-
- Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt");
- Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_inputB");
- Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_outputB");
- Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_inputC");
- Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_outputC");
+ try{
+ Collection<String> results = new HashSet<String>();
+ results.add("(one,1)");
+ results.add("(two,2)");
+ results.add("(three,3)");
+
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+ "Store A into 'table_testNativeMRJobMultiQueryOpt_inputB' "+
+ "Load 'table_testNativeMRJobMultiQueryOpt_outputB' "+
+ "`WordCount table_testNativeMRJobMultiQueryOpt_inputB table_testNativeMRJobMultiQueryOpt_outputB`;");
+ pigServer.registerQuery("C = mapreduce '" + jarFileName + "' " +
+ "Store A into 'table_testNativeMRJobMultiQueryOpt_inputC' "+
+ "Load 'table_testNativeMRJobMultiQueryOpt_outputC' "+
+ "`WordCount table_testNativeMRJobMultiQueryOpt_inputC table_testNativeMRJobMultiQueryOpt_outputC`;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ Tuple t;
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+
+ iter = pigServer.openIterator("B");
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+ }finally{
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_inputB");
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_outputB");
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_inputC");
+ Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_outputC");
+ }
}
-
+
// See PIG-506
@Test
public void testNativeMRJobTypeCastInserter() throws Exception{
- String[] input = {
- "one",
- "two",
- "three",
- "three",
- "two",
- "three"
- };
- Util.createInputFile(cluster, "table_testNativeMRJobTypeCastInserter", input);
-
- Collection<String> results = new HashSet<String>();
- results.add("(2)");
- results.add("(3)");
- results.add("(4)");
-
- pigServer.registerQuery("A = load 'table_testNativeMRJobTypeCastInserter';");
- pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
- "Store A into 'table_testNativeMRJobTypeCastInserter_input' "+
- "Load 'table_testNativeMRJobTypeCastInserter_output' as (name:chararray, count: int)"+
- "`WordCount table_testNativeMRJobTypeCastInserter_input table_testNativeMRJobTypeCastInserter_output`;");
- pigServer.registerQuery("C = foreach B generate count+1;");
-
- Iterator<Tuple> iter = pigServer.openIterator("C");
- Tuple t;
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertTrue("iter.hasNext()",iter.hasNext());
- t = iter.next();
- assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
- assertFalse(iter.hasNext());
-
- Util.deleteFile(cluster, "table_testNativeMRJobTypeCastInserter");
- Util.deleteFile(cluster, "table_testNativeMRJobTypeCastInserter_input");
- Util.deleteFile(cluster, "table_testNativeMRJobTypeCastInserter_output");
+ try{
+ Collection<String> results = new HashSet<String>();
+ results.add("(2)");
+ results.add("(3)");
+ results.add("(4)");
+
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+ pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+ "Store A into 'table_testNativeMRJobTypeCastInserter_input' "+
+ "Load 'table_testNativeMRJobTypeCastInserter_output' as (name:chararray, count: int)"+
+ "`WordCount table_testNativeMRJobTypeCastInserter_input table_testNativeMRJobTypeCastInserter_output`;");
+ pigServer.registerQuery("C = foreach B generate count+1;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ Tuple t;
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertTrue("iter.hasNext()",iter.hasNext());
+ t = iter.next();
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+ assertFalse(iter.hasNext());
+ }finally{
+ Util.deleteFile(cluster,"table_testNativeMRJobTypeCastInserter_input");
+ Util.deleteFile(cluster,"table_testNativeMRJobTypeCastInserter_output");
+ }
}
+
}