You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/03 04:57:26 UTC

svn commit: r1563765 [1/2] - in /pig/trunk: ./ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/fetch/ ...

Author: cheolsoo
Date: Mon Feb  3 03:57:25 2014
New Revision: 1563765

URL: http://svn.apache.org/r1563765
Log:
PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java
    pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java
    pig/trunk/test/org/apache/pig/test/TestFetch.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    pig/trunk/test/e2e/pig/tests/cmdline.conf
    pig/trunk/test/e2e/pig/tests/negative.conf
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/org/apache/pig/test/TestAssert.java
    pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb  3 03:57:25 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo)
+
 PIG-3730: Performance issue in SelfSpillBag (rajesh.balamohan via rohini)
 
 PIG-3654: Add class cache to PigContext (tmwoodruff via daijy)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Mon Feb  3 03:57:25 2014
@@ -62,6 +62,7 @@
 #pig.skewedjoin.reduce.memusage=0.3
 #pig.exec.nocombiner=false
 #opt.multiquery=true
+#opt.fetch=true
 
 #Following parameters are for configuring intermediate storage format
 #Supported storage types are seqfile and tfile

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java Mon Feb  3 03:57:25 2014
@@ -367,7 +367,8 @@ public class CSVExcelStorage extends Pig
         // further records to it. If they are the same (this would 
         // happen if multiple small files each with a header were combined
         // into one split), we know to skip the duplicate header record as well.
-        if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER && splitIndex == 0) {
+        if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER &&
+                (splitIndex == 0 || splitIndex == -1)) {
             try {
                 if (!in.nextKeyValue())
                     return null;

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java Mon Feb  3 03:57:25 2014
@@ -295,7 +295,7 @@ public class FixedWidthLoader extends Lo
     
     @Override
     public Tuple getNext() throws IOException {
-        if (loadingFirstRecord && skipHeader && splitIndex == 0) {
+        if (loadingFirstRecord && skipHeader && (splitIndex == 0 || splitIndex == -1)) {
             try {
                 if (!reader.nextKeyValue()) 
                     return null;

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Mon Feb  3 03:57:25 2014
@@ -208,6 +208,7 @@ public class Main {
             opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
             opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED);
             opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+            opts.registerOpt('N', "no_fetch", CmdLineParser.ValueExpected.NOT_ACCEPTED);
             opts.registerOpt('P', "propertyFile", CmdLineParser.ValueExpected.REQUIRED);
 
             ExecMode mode = ExecMode.UNKNOWN;
@@ -300,6 +301,10 @@ public class Main {
                     properties.setProperty("opt.multiquery",""+false);
                     break;
 
+                case 'N':
+                    properties.setProperty(PigConfiguration.OPT_FETCH,""+false);
+                    break;
+
                 case 'p':
                     params.add(opts.getValStr());
                     break;
@@ -863,6 +868,7 @@ public class Main {
             System.out.println("    -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.");
             System.out.println("    -F, -stop_on_failure - Aborts execution on the first failed job; default is off");
             System.out.println("    -M, -no_multiquery - Turn multiquery optimization off; default is on");
+            System.out.println("    -N, -no_fetch - Turn fetch optimization off; default is on");
             System.out.println("    -P, -propertyFile - Path to property file");
             System.out.println("    -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including");
             System.out.println("                     any environment variables that are set by the pig command.");
@@ -885,6 +891,8 @@ public class Main {
             System.out.println("            Only disable combiner as a temporary workaround for problems.");
             System.out.println("        opt.multiquery=true|false; multiquery is on by default.");
             System.out.println("            Only disable multiquery as a temporary workaround for problems.");
+            System.out.println("        opt.fetch=true|false; fetch is on by default.");
+            System.out.println("            Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs.");
             System.out.println("        pig.tmpfilecompression=true|false; compression is off by default.");
             System.out.println("            Determines whether output of intermediate jobs is compressed.");
             System.out.println("        pig.tmpfilecompression.codec=lzo|gzip; default is gzip.");

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb  3 03:57:25 2014
@@ -148,5 +148,11 @@ public class PigConfiguration {
      * Controls the max threshold size to convert jobs to run in local mode
      */
     public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
+    
+    /**
+     * This parameter enables/disables fetching. By default it is turned on.
+     */
+    public static final String OPT_FETCH = "opt.fetch";
+    
 }
 

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Mon Feb  3 03:57:25 2014
@@ -104,6 +104,7 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.SimpleFetchPigStats;
 
 /**
  *
@@ -418,6 +419,12 @@ public class PigServer {
      */
     protected List<ExecJob> getJobs(PigStats stats) {
         LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
+        if (stats instanceof SimpleFetchPigStats) {
+            HJob job = new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, stats.result(null)
+                    .getPOStore(), null);
+            jobs.add(job);
+            return jobs;
+        }
         JobGraph jGraph = stats.getJobGraph();
         Iterator<JobStats> iter = jGraph.iterator();
         while (iter.hasNext()) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Feb  3 03:57:25 2014
@@ -40,7 +40,8 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -355,6 +356,13 @@ public abstract class HExecutionEngine i
 
         try {
             PhysicalPlan pp = compile(lp, pc.getProperties());
+            //if the compiled physical plan fulfills the requirements of the
+            //fetch optimizer, then further transformations / MR jobs creations are
+            //skipped; a SimpleFetchPigStats will be returned through which the result
+            //can be directly fetched from the underlying storage
+            if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+                return new FetchLauncher(pc).launchPig(pp);
+            }
             return launcher.launchPig(pp, grpName, pigContext);
         } catch (ExecException e) {
             throw (ExecException) e;
@@ -385,6 +393,10 @@ public abstract class HExecutionEngine i
             pp.explain(pps, format, verbose);
 
             MapRedUtil.checkLeafIsStore(pp, pigContext);
+            if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+                new FetchLauncher(pigContext).explain(pp, pc, eps, format);
+                return;
+            }
             launcher.explain(pp, pigContext, eps, format, verbose);
         } finally {
             launcher.reset();

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Feb  3 03:57:25 2014
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.SimpleFetchPigStats;
+import org.joda.time.DateTimeZone;
+
+/**
+ * This class is responsible for executing the fetch task, saving the result to disk
+ * and do the necessary cleanup afterwards.
+ *
+ */
+public class FetchLauncher {
+
+    private final PigContext pigContext;
+    private final Configuration conf;
+
+    public FetchLauncher(PigContext pigContext) {
+        this.pigContext = pigContext;
+        this.conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
+    }
+
+    /**
+     * Runs the fetch task by executing chain of calls on the PhysicalPlan from the leaf
+     * up to the LoadFunc
+     *
+     * @param pp - Physical plan
+     * @return SimpleFetchPigStats instance representing the fetched result
+     * @throws IOException
+     */
+    public PigStats launchPig(PhysicalPlan pp) throws IOException {
+        POStore poStore = (POStore) pp.getLeaves().get(0);
+        init(pp, poStore);
+
+        // run fetch
+        runPipeline(poStore);
+
+        UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
+                new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
+        udfFinisher.visit();
+
+        return PigStats.start(new SimpleFetchPigStats(pigContext, poStore));
+    }
+
+    /**
+     * Creates an empty MR plan
+     *
+     * @param pp - Physical plan
+     * @param pc - PigContext
+     * @param ps - PrintStream to write the plan to
+     * @param format format of the output plan
+     * @throws PlanException
+     * @throws VisitorException
+     * @throws IOException
+     */
+    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps, String format)
+            throws PlanException, VisitorException, IOException {
+        if ("xml".equals(format)) {
+            ps.println("<mapReducePlan>No MR jobs. Fetch only</mapReducePlan>");
+        }
+        else {
+            ps.println("#--------------------------------------------------");
+            ps.println("# Map Reduce Plan                                  ");
+            ps.println("#--------------------------------------------------");
+            ps.println("No MR jobs. Fetch only.");
+        }
+        return;
+    }
+
+    private void init(PhysicalPlan pp, POStore poStore) throws IOException {
+
+        poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
+        poStore.setUp();
+        if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
+            MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
+        }
+
+        PhysicalOperator.setReporter(new FetchProgressableReporter());
+        SchemaTupleBackend.initialize(conf, pigContext);
+
+        UDFContext udfContext = UDFContext.getUDFContext();
+        udfContext.addJobConf(conf);
+        udfContext.setClientSystemProps(pigContext.getProperties());
+        udfContext.serialize(conf);
+
+        PigMapReduce.sJobConfInternal.set(conf);
+        String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
+        if (dtzStr != null && dtzStr.length() > 0) {
+            // ensure that the internal timezone is uniformly in UTC offset style
+            DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
+        }
+    }
+
+    private void runPipeline(POStore posStore) throws IOException {
+        while (true) {
+            Result res = posStore.getNextTuple();
+            if (res.returnStatus == POStatus.STATUS_OK)
+                continue;
+
+            if (res.returnStatus == POStatus.STATUS_EOP) {
+                posStore.tearDown();
+                return;
+            }
+
+            if (res.returnStatus == POStatus.STATUS_NULL)
+                continue;
+
+            if(res.returnStatus==POStatus.STATUS_ERR){
+                String errMsg;
+                if(res.result != null) {
+                    errMsg = "Fetch failed. Couldn't retrieve result: " + res.result;
+                } else {
+                    errMsg = "Fetch failed. Couldn't retrieve result";
+                }
+                int errCode = 2088;
+                ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
+                throw ee;
+            }
+        }
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Mon Feb  3 03:57:25 2014
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * FetchOptimizer determines whether the entire physical plan is fetchable, meaning
+ * that the task's result can be directly read (fetched) from the underlying storage
+ * rather than creating MR jobs. During the check {@link FetchablePlanVisitor} is used
+ * to walk through the plan.
+ *
+ */
+public class FetchOptimizer {
+    private static final Log LOG = LogFactory.getLog(FetchOptimizer.class);
+
+    /**
+     * Checks whether the fetch is enabled
+     *
+     * @param pc
+     * @return true if fetching is enabled
+     */
+    public static boolean isFetchEnabled(PigContext pc) {
+        return "true".equalsIgnoreCase(
+                pc.getProperties().getProperty(PigConfiguration.OPT_FETCH, "true"));
+    }
+
+    /**
+     * Visits the plan with {@link FetchablePlanVisitor} and checks whether the
+     * plan is fetchable.
+     *
+     * @param pc PigContext
+     * @param pp the physical plan to be examined
+     * @return true if the plan is fetchable
+     * @throws VisitorException
+     */
+    public static boolean isPlanFetchable(PigContext pc, PhysicalPlan pp) throws VisitorException {
+        if (isEligible(pc, pp)) {
+            FetchablePlanVisitor fpv = new FetchablePlanVisitor(pc, pp);
+            fpv.visit();
+            boolean isFetchable = fpv.isPlanFetchable();
+            //initialization
+            if (isFetchable)
+                init(pp);
+            return isFetchable;
+        }
+        return false;
+    }
+
+    private static void init(PhysicalPlan pp) throws VisitorException {
+        //mark POStream ops 'fetchable'
+        LinkedList<POStream> posList = PlanHelper.getPhysicalOperators(pp, POStream.class);
+        for (POStream pos : posList) {
+            pos.setFetchable(true);
+        }
+    }
+
+    /**
+     * Checks whether the plan fulfills the prerequisites needed for fetching.
+     *
+     * @param pc PigContext
+     * @param pp the physical plan to be examined
+     * @return
+     */
+    private static boolean isEligible(PigContext pc, PhysicalPlan pp) {
+        if (!isFetchEnabled(pc)) {
+            return false;
+        }
+
+        List<PhysicalOperator> roots = pp.getRoots();
+        for (PhysicalOperator po : roots) {
+            if (!(po instanceof POLoad)) {
+                String msg = "Expected physical operator at root is POLoad. Found : "
+                        + po.getClass().getCanonicalName() + ". Fetch optimizer will be disabled.";
+                LOG.debug(msg);
+                return false;
+            }
+        }
+
+        //consider single leaf jobs only
+        int leafSize = pp.getLeaves().size();
+        if (pp.getLeaves().size() != 1) {
+            LOG.debug("Expected physical plan should have one leaf. Found " + leafSize);
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * A plan is considered 'fetchable' if:
+     * <pre>
+     * - it contains only: LIMIT, FILTER, FOREACH, STREAM, UNION(no implicit SPLIT is allowed)
+     * - no STORE
+     * - no scalar aliases ({@link org.apache.pig.impl.builtin.ReadScalars ReadScalars})
+     * - {@link org.apache.pig.LoadFunc LoadFunc} is not a {@link org.apache.pig.impl.builtin.SampleLoader SampleLoader}
+     * </pre>
+     */
+    private static class FetchablePlanVisitor extends PhyPlanVisitor {
+
+        private boolean planFetchable = true;
+        private PigContext pc;
+
+        public FetchablePlanVisitor(PigContext pc, PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.pc = pc;
+        }
+
+        @Override
+        public void visit() throws VisitorException {
+            new PhyPlanSetter(mPlan).visit();
+            super.visit();
+        }
+
+        @Override
+        public void visitLoad(POLoad ld) throws VisitorException{
+            if (ld.getLoadFunc() instanceof SampleLoader) {
+                planFetchable = false;
+            }
+        }
+
+        @Override
+        public void visitStore(POStore st) throws VisitorException{
+            String basePathName = st.getSFile().getFileName();
+
+            //plan is fetchable if POStore belongs to EXPLAIN
+            if ("fakefile".equals(basePathName)) {
+                return;
+            }
+
+            //Otherwise check if target storage format equals to the intermediate storage format
+            //and its path points to a temporary storage path
+            boolean hasTmpStorageClass = st.getStoreFunc().getClass()
+                .equals(Utils.getTmpFileStorageClass(pc.getProperties()));
+
+            try {
+                boolean hasTmpTargetPath = isTempPath(basePathName);
+                if (!(hasTmpStorageClass && hasTmpTargetPath)) {
+                    planFetchable = false;
+                }
+            }
+            catch (IOException e) {
+                String msg = "Internal error. Could not retrieve temporary store location.";
+                throw new VisitorException(msg, e);
+            }
+        }
+
+        @Override
+        public void visitNative(PONative nat) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitSplit(POSplit spl) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitDemux(PODemux demux) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitCounter(POCounter poCounter) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitRank(PORank rank) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitDistinct(PODistinct distinct) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitSort(POSort sort) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitCross(POCross cross) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitFRJoin(POFRJoin join) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach)
+                throws VisitorException {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitPreCombinerLocalRearrange(
+                POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+            planFetchable = false;
+        }
+
+        @Override
+        public void visitPartialAgg(POPartialAgg poPartialAgg) {
+            planFetchable = false;
+        }
+
+        private boolean isPlanFetchable() {
+            return planFetchable;
+        }
+        
+        private boolean isTempPath(String basePathName) throws DataStorageException {
+            String tdir = pc.getProperties().getProperty("pig.temp.dir", "/tmp");
+            String tempStore = pc.getDfs().asContainer(tdir + "/temp").toString();
+            Matcher matcher = Pattern.compile(tempStore + "-?[0-9]+").matcher(basePathName);
+            return matcher.lookingAt();
+        }
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Mon Feb  3 03:57:25 2014
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * This class is used to have a POStore write the output to the underlying storage
+ * via a output collector/record writer in case of a fetch task. It sets up dummy context
+ * objects which otherwise would be initialized by the Hadoop job itself.
+ */
+public class FetchPOStoreImpl extends POStoreImpl {
+
+    private PigContext pc;
+    private RecordWriter<?, ?> writer;
+    private TaskAttemptContext context;
+    private OutputCommitter outputCommitter;
+
+    public FetchPOStoreImpl(PigContext pc) {
+        this.pc = pc;
+    }
+
+    @Override
+    public StoreFuncInterface createStoreFunc(POStore store) throws IOException {
+
+        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+        StoreFuncInterface storeFunc = store.getStoreFunc();
+        JobContext jc = HadoopShims.createJobContext(conf, new JobID());
+
+        OutputFormat<?, ?> outputFormat = storeFunc.getOutputFormat();
+        PigOutputFormat.setLocation(jc, store);
+        context = HadoopShims.createTaskAttemptContext(conf, HadoopShims.getNewTaskAttemptID());
+        PigOutputFormat.setLocation(context, store);
+
+        try {
+            outputFormat.checkOutputSpecs(jc);
+        }
+        catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+
+        try {
+            outputCommitter = outputFormat.getOutputCommitter(context);
+            outputCommitter.setupJob(jc);
+            outputCommitter.setupTask(context);
+            writer = outputFormat.getRecordWriter(context);
+        }
+        catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+        storeFunc.prepareToWrite(writer);
+        return storeFunc;
+    }
+
+    @Override
+    public void tearDown() throws IOException {
+        if (writer != null) {
+            try {
+                writer.close(context);
+            }
+            catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            writer = null;
+        }
+        if (outputCommitter.needsTaskCommit(context))
+            outputCommitter.commitTask(context);
+        HadoopShims.commitOrCleanup(outputCommitter, context);
+    }
+
+    @Override
+    public void cleanUp() throws IOException {
+        if (writer != null) {
+            try {
+                writer.close(context);
+            }
+            catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            writer = null;
+        }
+        HadoopShims.commitOrCleanup(outputCommitter, context);
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java Mon Feb  3 03:57:25 2014
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+
+/**
+ * A dummy ProgressableReporter used for fetch tasks
+ *
+ */
+public class FetchProgressableReporter implements PigProgressable {
+
+    private static final Log LOG = LogFactory.getLog(FetchProgressableReporter.class);
+
+    public void progress() {
+
+    }
+
+    public void progress(String msg) {
+        LOG.info(msg);
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Feb  3 03:57:25 2014
@@ -685,27 +685,13 @@ public class JobControlCompiler{
                     if(!pigContext.inIllustrator)
                         mro.reducePlan.remove(st);
                 }
-
-                // set out filespecs
-                String outputPathString = st.getSFile().getFileName();
-                if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
-                    conf.set("pig.streaming.log.dir",
-                            new Path(outputPathString, LOG_DIR).toString());
-                } else {
-                    String tmpLocationStr =  FileLocalizer
-                            .getTemporaryPath(pigContext).toString();
-                    tmpLocation = new Path(tmpLocationStr);
-                    conf.set("pig.streaming.log.dir",
-                            new Path(tmpLocation, LOG_DIR).toString());
-                }
-                conf.set("pig.streaming.task.output.dir", outputPathString);
+                
+                MapRedUtil.setupStreamingDirsConfSingle(st, pigContext, conf);
             }
             else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
                 log.info("Setting up multi store job");
-                String tmpLocationStr =  FileLocalizer
-                        .getTemporaryPath(pigContext).toString();
-                tmpLocation = new Path(tmpLocationStr);
-
+                MapRedUtil.setupStreamingDirsConfMulti(pigContext, conf);
+                
                 nwJob.setOutputFormatClass(PigOutputFormat.class);
 
                 boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
@@ -718,10 +704,6 @@ public class JobControlCompiler{
                     sto.setMultiStore(true);
                     sto.setIndex(idx++);
                 }
-
-                conf.set("pig.streaming.log.dir",
-                        new Path(tmpLocation, LOG_DIR).toString());
-                conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
             }
 
             // store map key type

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb  3 03:57:25 2014
@@ -125,11 +125,7 @@ public class POUserFunc extends Expressi
     private void instantiateFunc(FuncSpec fSpec) {
         this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
         this.setSignature(signature);
-        Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
-    	Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
-
-    	if(tmpS!=null)
-    		this.func.setInputSchema(tmpS);
+        this.setFuncInputSchema(signature);
         if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
             executor = new MonitoredUDFExecutor(func);
         }
@@ -604,4 +600,17 @@ public class POUserFunc extends Expressi
             this.func.setUDFContextSignature(signature);
         }
     }
+
+    /**
+     * Sets EvalFunc's inputschema based on the signature
+     * @param signature
+     */
+    public void setFuncInputSchema(String signature) {
+        Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
+        Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
+        if(tmpS!=null) {
+            this.func.setInputSchema(tmpS);
+        }
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Feb  3 03:57:25 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -32,9 +31,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -62,6 +59,14 @@ public class POStream extends PhysicalOp
 
     protected boolean allOutputFromBinaryProcessed = false;
 
+    /**
+     * This flag indicates whether streaming is done through fetching. If set,
+     * {@link FetchLauncher} pulls out the data from the pipeline. Therefore we need to
+     * skip the case in {@link #getNextTuple()} which is called by map() or reduce() when
+     * processing the next tuple.
+     */
+    private boolean isFetchable;
+
     public POStream(OperatorKey k, ExecutableManager executableManager, 
                       StreamingCommand command, Properties properties) {
         super(k);
@@ -170,7 +175,7 @@ public class POStream extends PhysicalOp
             // if we are here, we haven't consumed all input to be sent
             // to the streaming binary - check if we are being called
             // from close() on the map or reduce
-            if(this.parentPlan.endOfAllInput) {
+            if(isFetchable || this.parentPlan.endOfAllInput) {
                 Result r = getNextHelper((Tuple)null);
                 if(r.returnStatus == POStatus.STATUS_EOP) {
                     // we have now seen *ALL* possible input
@@ -373,4 +378,19 @@ public class POStream extends PhysicalOp
       }
       return (Tuple) out;
     }
+
+    /**
+     * @return true if streaming is done through fetching
+     */
+    public boolean isFetchable() {
+        return isFetchable;
+    }
+
+    /**
+     * @param isFetchable - whether fetching is applied on POStream
+     */
+    public void setFetchable(boolean isFetchable) {
+        this.isFetchable = isFetchable;
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Feb  3 03:57:25 2014
@@ -41,6 +41,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -174,6 +175,48 @@ public class MapRedUtil {
         }
     }
     
+    /**
+     * Sets up output and log dir paths for a single-store streaming job
+     *
+     * @param st - POStore of the current job
+     * @param pigContext
+     * @param conf
+     * @throws IOException
+     */
+    public static void setupStreamingDirsConfSingle(POStore st, PigContext pigContext,
+            Configuration conf) throws IOException {
+        // set out filespecs
+        String outputPathString = st.getSFile().getFileName();
+        if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
+            conf.set("pig.streaming.log.dir",
+                    new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
+        }
+        else {
+            String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+            Path tmpLocation = new Path(tmpLocationStr);
+            conf.set("pig.streaming.log.dir",
+                    new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+        }
+        conf.set("pig.streaming.task.output.dir", outputPathString);
+    }
+
+    /**
+     * Sets up output and log dir paths for a multi-store streaming job
+     *
+     * @param pigContext
+     * @param conf
+     * @throws IOException
+     */
+    public static void setupStreamingDirsConfMulti(PigContext pigContext, Configuration conf)
+            throws IOException {
+
+        String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+        Path tmpLocation = new Path(tmpLocationStr);
+        conf.set("pig.streaming.log.dir",
+                new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+        conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
+    }
+
     public static FileSpec checkLeafIsStore(
             PhysicalPlan plan,
             PigContext pigContext) throws ExecException {

Modified: pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java Mon Feb  3 03:57:25 2014
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 
 public class PropertiesUtil {
     private static final String DEFAULT_PROPERTIES_FILE = "/pig-default.properties";
@@ -143,6 +144,11 @@ public class PropertiesUtil {
             //by default we keep going on error on the backend
             properties.setProperty("stop.on.failure", ""+false);
         }
+
+        if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) {
+            //by default fetch optimization is on
+            properties.setProperty(PigConfiguration.OPT_FETCH, ""+true);
+        }
     }
     
     /**

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Feb  3 03:57:25 2014
@@ -25,9 +25,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.SequenceInputStream;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketImplFactory;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
@@ -334,7 +331,7 @@ public class Utils {
     }
 
     public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
-        Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorage(ConfigurationUtil.toProperties(conf)).getStorageClass();
+        Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorageClass(ConfigurationUtil.toProperties(conf));
         try {
             return storageClass.newInstance();
         } catch (InstantiationException e) {
@@ -344,6 +341,10 @@ public class Utils {
         }
     }
 
+    public static Class<? extends FileInputLoadFunc> getTmpFileStorageClass(Properties properties) {
+       return getTmpFileStorage(properties).getStorageClass();
+    }
+
     private static TEMPFILE_STORAGE getTmpFileStorage(Properties properties) {
         boolean tmpFileCompression = properties.getProperty(
                 PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "false").equals("true");

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Mon Feb  3 03:57:25 2014
@@ -506,6 +506,10 @@ public class ExpToPhyTranslationVisitor 
                     .getNextNodeId(DEFAULT_SCOPE)), -1,
                     null, op.getFuncSpec(), (EvalFunc) f);
             ((POUserFunc)p).setSignature(op.getSignature());
+            //reinitialize input schema from signature
+            if (((POUserFunc)p).getFunc().getInputSchema() == null) {
+                ((POUserFunc)p).setFuncInputSchema(op.getSignature());
+            }
             List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
             if (cacheFiles != null) {
                 ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));

Added: pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java Mon Feb  3 03:57:25 2014
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * SimpleFetchPigStats encapsulates dummy statistics of a fetch task, since during a fetch
+ * no MR jobs are executed
+ *
+ */
+public class SimpleFetchPigStats extends PigStats {
+
+    private final List<OutputStats> outputStatsList;
+    private final List<InputStats> inputStatsList;
+    private final JobGraph emptyJobPlan = new JobGraph();
+
+    public SimpleFetchPigStats(PigContext pigContext, POStore poStore) {
+
+        super.pigContext = pigContext;
+        super.startTime = super.endTime = System.currentTimeMillis();
+        super.userId = System.getProperty("user.name");
+
+        Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
+
+        //initalize empty stats
+        OutputStats os = new OutputStats(null, -1, -1, true);
+        os.setConf(conf);
+        os.setPOStore(poStore);
+        this.outputStatsList = Collections.unmodifiableList(Arrays.asList(os));
+
+        InputStats is = new InputStats(null, -1, -1, true);
+        is.setConf(conf);
+        this.inputStatsList = Collections.unmodifiableList(Arrays.asList(is));
+
+    }
+
+    @Override
+    public JobClient getJobClient() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isEmbedded() {
+        return false;
+    }
+
+    @Override
+    public Map<String, List<PigStats>> getAllStats() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> getAllErrorMessages() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public JobGraph getJobGraph() {
+       return emptyJobPlan;
+    }
+
+    @Override
+    public List<String> getOutputLocations() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<String> getOutputNames() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public long getNumberBytes(String location) {
+        return -1L;
+    }
+
+    @Override
+    public long getNumberRecords(String location) {
+        return -1L;
+    }
+
+    @Override
+    public String getOutputAlias(String location) {
+        return null;
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        return 0L;
+    }
+
+    @Override
+    public long getProactiveSpillCountRecords() {
+        return 0L;
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        return 0L;
+    }
+
+    @Override
+    public long getBytesWritten() {
+        return 0L;
+    }
+
+    @Override
+    public long getRecordWritten() {
+        return 0L;
+    }
+
+    @Override
+    public int getNumberJobs() {
+        return 0;
+    }
+
+    @Override
+    public List<OutputStats> getOutputStats() {
+        return outputStatsList;
+    }
+
+    @Override
+    public OutputStats result(String alias) {
+        return outputStatsList.get(0);
+    }
+
+    @Override
+    public List<InputStats> getInputStats() {
+        return inputStatsList;
+    }
+
+    @Override
+    public void setBackendException(String jobId, Exception e) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void start() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void stop() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberSuccessfulJobs() {
+        return -1;
+    }
+
+    @Override
+    public int getNumberFailedJobs() {
+        return -1;
+    }
+
+}

Modified: pig/trunk/test/e2e/pig/tests/cmdline.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/cmdline.conf?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/cmdline.conf (original)
+++ pig/trunk/test/e2e/pig/tests/cmdline.conf Mon Feb  3 03:57:25 2014
@@ -68,6 +68,7 @@ describe A;\,
 #                        #JIRA[PIG-373]
 #			{
 #			'num' => 4,
+#			'java_params' => ['-Dopt.fetch=false'],
 #			'pig' => q\ 
 #A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray);
 #describe A;
@@ -240,6 +241,7 @@ describe D;\,
 		
 			{
 			'num' => 1,
+			'java_params' => ['-Dopt.fetch=false'],
 			'pig' => q\ 
 A = load ':INPATH:/singlefile/unicode100' as (name:chararray);
 dump A;\,

Modified: pig/trunk/test/e2e/pig/tests/negative.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/negative.conf?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/negative.conf (original)
+++ pig/trunk/test/e2e/pig/tests/negative.conf Mon Feb  3 03:57:25 2014
@@ -34,6 +34,7 @@ $cfg = {
 		'tests' => [
 			{
 			'num' => 1,
+			'java_params' => ['-Dopt.fetch=false'],
 			'pig' => q\
 a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = group a by name;
@@ -48,6 +49,7 @@ dump c;\,
 		'tests' => [
 			{
 			'num' => 1,
+			'java_params' => ['-Dopt.fetch=false'],
 			'pig' => q\a = load '/user/gates/nosuchfile'; dump a;\,
 			'expected_err_regex' => "ERROR 2118: Input path does not exist",
 			},
@@ -248,6 +250,7 @@ store a into ':INPATH:/singlefile/fileex
                         {
 			# missing quotes around command
                         'num' => 1,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = foreach A generate $2, $1, $0;
@@ -259,6 +262,7 @@ dump C;#,
                         {
 			# input spec missing parenthesis
                         'num' => 2,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define CMD `perl PigStreaming.pl foo -` input 'foo' using PigStorage() ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -269,6 +273,7 @@ dump B;#,
                         {
 			# no serializer name after using
                         'num' => 3,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define CMD `perl PigStreaming.pl foo -` output ('foo' using );
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -279,6 +284,7 @@ dump B;#,
                         {
 			# alias name missing from define
                         'num' => 4,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define `perl PigStreaming.pl foo -`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -289,6 +295,7 @@ dump B;#,
                         {
 			# quotes missing from name of the file in ship script
                         'num' => 5,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define CMD `perl PigStreaming.pl foo -` ship(:SCRIPTHOMEPATH:/PigStreaming.pl);
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -306,6 +313,7 @@ dump B;#,
 			# Define uses using non-existent command (autoship)
                         'num' => 1,
 			'execonly' => 'mapred',
+			'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreamingNotThere.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -316,6 +324,7 @@ dump B;\,
                         {
 			# Define uses non-existent command with ship clause
                         'num' => 2,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreamingNotThere.pl foo -` ship(':SCRIPTHOMEPATH:/PigStreamingNotThere.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -339,6 +348,7 @@ dump E;\,
                         {
 			# Define uses non-existent serializer
                         'num' => 4,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl foo -` input('foo' using SerializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -349,6 +359,7 @@ dump B;\,
                         {
 			# Define uses non-existent deserializer
                         'num' => 5,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl` output(stdout using DeserializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -359,6 +370,7 @@ dump B;\,
                         {
 			# Invalid skip path
                         'num' => 6,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 set stream.skippath 'foo';
 define CMD `perl PigStreaming.pl`;
@@ -370,6 +382,7 @@ dump B;\,
                         {
 			# Invalid command alias in stream operator
                         'num' => 7,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -380,6 +393,7 @@ dump B;\,
                         {
 			# Invalid operator alias in stream operator
                         'num' => 8,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -483,6 +497,7 @@ store D into ':OUTPATH:';\,
 			# Define uses using non-existent command
                         'num' => 1,
 						'execonly' => 'local',
+						'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreamingNotThere.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Feb  3 03:57:25 2014
@@ -4499,9 +4499,9 @@ store C into ':OUTPATH:';\, 
                 },
                 {
                     # Test Union using merge with incompatible types.  float->bytearray and chararray->bytearray
-			        'num' => 8,
-			        'delimiter' => '	',
-			        'pig' => q\
+                    'num' => 8,
+                    'delimiter' => '	',
+                    'pig' => q\
 A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int);
 B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:chararray);
 C = union onschema A, B;
@@ -4511,17 +4511,18 @@ A = load ':INPATH:/singlefile/studenttab
 B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray);
 C = union A, B;
 store C into ':OUTPATH:';\,
- 				}
-		]
+                }
+              ]
 
             },
-			{
+            {
 
-			# Test Union using merge with Simple data types
-	        'name' => 'UdfDistributedCache',
+            # Test Union using merge with Simple data types
+            'name' => 'UdfDistributedCache',
             'tests' => [
-            	{
-            		'num' => 1,
+                {
+                    'num' => 1,
+                    'java_params' => ['-Dopt.fetch=false'],
                     'execonly' => 'mapred', # since distributed cache is not supported in local mode
                     'pig' => q?
                         register :FUNCPATH:/testudf.jar;
@@ -4531,8 +4532,8 @@ store C into ':OUTPATH:';\,
                         c = foreach b generate udfdc(age);
                         dump c;?,
                     'expected_out_regex' => ":UdfDistributedCache_1_out:",
-            	},
-	        ]
+                },
+              ]
             }, {
                 'name' => 'MonitoredUDF',
                 'tests' => [

Modified: pig/trunk/test/org/apache/pig/test/TestAssert.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAssert.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAssert.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAssert.java Mon Feb  3 03:57:25 2014
@@ -22,10 +22,12 @@ import static org.apache.pig.builtin.moc
 import static org.apache.pig.builtin.mock.Storage.tuple;
 
 import java.util.List;
+import java.util.Properties;
 
 import junit.framework.Assert;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
@@ -84,9 +86,36 @@ public class TestAssert {
       try {
           pigServer.openIterator("A");
       } catch (FrontendException fe) {
+            Assert.assertTrue(fe.getCause().getCause().getMessage().contains("Assertion violated"));
+      }
+  }
+
+  /**
+   * Verify that ASSERT operator works. Disable fetch for this testcase.
+   * @throws Exception
+   */
+  @Test
+  public void testNegativeWithoutFetch() throws Exception {
+      PigServer pigServer = new PigServer(ExecType.LOCAL);
+      Data data = resetData(pigServer);
+
+      data.set("foo",
+              tuple(1),
+              tuple(2),
+              tuple(3)
+              );
+
+      pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (i:int);");
+      pigServer.registerQuery("ASSERT A BY i > 1 , 'i should be greater than 1';");
+
+      Properties props = pigServer.getPigContext().getProperties();
+      props.setProperty(PigConfiguration.OPT_FETCH, "false");
+      try {
+          pigServer.openIterator("A");
+      } catch (FrontendException fe) {
           Assert.assertTrue(fe.getCause().getMessage().contains(
                   "Job terminated with anomalous status FAILED"));
       }
-       
   }
+
 }

Modified: pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java Mon Feb  3 03:57:25 2014
@@ -78,6 +78,7 @@ public class TestAutoLocalMode {
     @Before
     public void setUp() throws Exception{
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.OPT_FETCH, "false");
         pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, String.valueOf("true"));
         pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, "200");
 

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Feb  3 03:57:25 2014
@@ -29,10 +29,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.BinStorage;
@@ -1011,17 +1013,22 @@ public class TestEvalPipeline2 {
         
         Iterator<Tuple> iter = pigServer.openIterator("e");
         
+        Map<Object, Object> expected = new HashMap<Object, Object>(3);
+        expected.put(1, null);
+        expected.put(2, null);
+        expected.put(4, null);
+
         Tuple t = iter.next();
         Assert.assertTrue(t.size()==1);
-        Assert.assertTrue((Integer)t.get(0)==1);
+        Assert.assertTrue(expected.containsKey(t.get(0)));
         
         t = iter.next();
         Assert.assertTrue(t.size()==1);
-        Assert.assertTrue((Integer)t.get(0)==4);
+        Assert.assertTrue(expected.containsKey(t.get(0)));
         
         t = iter.next();
         Assert.assertTrue(t.size()==1);
-        Assert.assertTrue((Integer)t.get(0)==2);
+        Assert.assertTrue(expected.containsKey(t.get(0)));
         
         Assert.assertFalse(iter.hasNext());
     }
@@ -1595,10 +1602,36 @@ public class TestEvalPipeline2 {
             pigServer.openIterator("b");
             Assert.fail();
         } catch (Exception e) {
-            Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
+            String message = e.getCause().getCause().getMessage();
+            Assert.assertTrue(message.contains(ArrayList.class.getName()));
         }
     }
     
+    // See PIG-1826
+    @Test
+    public void testNonStandardDataWithoutFetch() throws Exception{
+        Properties props = pigServer.getPigContext().getProperties();
+        props.setProperty(PigConfiguration.OPT_FETCH, "false");
+        String[] input1 = {
+                "0",
+        };
+        try {
+            Util.createInputFile(cluster, "table_testNonStandardDataWithoutFetch", input1);
+            pigServer.registerQuery("a = load 'table_testNonStandardDataWithoutFetch' as (a0);");
+            pigServer.registerQuery("b = foreach a generate " + UDFWithNonStandardType.class.getName() + "(a0);");
+
+            try {
+                pigServer.openIterator("b");
+                Assert.fail();
+            } catch (Exception e) {
+                Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
+            }
+        }
+        finally {
+            props.setProperty(PigConfiguration.OPT_FETCH, "true");
+        }
+    }
+
     // See PIG-2078
     @Test
     public void testProjectNullBag() throws Exception{

Added: pig/trunk/test/org/apache/pig/test/TestFetch.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFetch.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFetch.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestFetch.java Mon Feb  3 03:57:25 2014
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.parser.ParserTestingUtils;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFetch {
+
+    private PigServer pigServer;
+
+    private static File inputFile1;
+    private static File inputFile2;
+
+    private static final long SEED = 1013;
+    private static final Random r = new Random(SEED);
+
+    @BeforeClass
+    public static void setUpOnce() throws Exception {
+
+        String[] data1 = {
+            "1 {(1,2,7,8,b),(1,3,3,5,a)}",
+            "2 {(2,4,6,6,k)}",
+            "3 {(3,7,8,9,p),(3,6,3,1,n)}",
+            "5 {(5,1,1,2,c)}"
+        };
+
+        String[] data2 = {
+            "1 3 a",
+            "1 2 b",
+            "2 4 k",
+            "3 6 n",
+            "3 7 p",
+            "5 1 c"
+        };
+
+        inputFile1 = Util.createInputFile("tmp", "testFetchData1.txt", data1);
+        inputFile2 = Util.createInputFile("tmp", "testFetchData2.txt", data2);
+
+    }
+
+    @Before
+    public void setUp() throws Exception{
+        pigServer = new PigServer(ExecType.LOCAL, new Properties());
+    }
+
+    @Test
+    public void test1() throws Exception {
+        String query =
+            "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' " +
+                 "using PigStorage(' ') as (a:int, b: " +
+                   "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});" +
+            "C = foreach A {" +
+            "  temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);" +
+            "  temp2 = filter temp1 by key < 400;" +
+            "  temp3 = limit temp2 3;" +
+            "  temp4 = foreach temp3 generate key-r as (id:int);" +
+            "  temp5 = limit temp4 4;" +
+            "  temp6 = filter temp5 by id < 100;" +
+            "  generate flatten(temp6) as (id:int), a;" +
+            "};" +
+            "D = foreach C generate (" +
+            "  case id % 4" +
+            "    when 0 then true" +
+            "    else false" +
+            "  end" +
+            ") as (check:boolean);";
+
+        LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query);
+
+        PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine())
+                .compile(lp, null);
+
+        boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+        assertTrue(planFetchable);
+
+    }
+
+    @Test
+    public void test2() throws Exception {
+        Properties properties = pigServer.getPigContext().getProperties();
+        properties.setProperty(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "gz");
+        properties.setProperty(PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "true");
+        properties.setProperty(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "tfile");
+
+        String query =
+            "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' " +
+                 "using PigStorage(' ') as (a:int, b: " +
+                   "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});" +
+            "C = foreach A {" +
+            "  temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);" +
+            "  temp2 = filter temp1 by key < 400;" +
+            "  temp3 = limit temp2 3;" +
+            "  temp4 = foreach temp3 generate key-r as (id:int);" +
+            "  temp5 = limit temp4 4;" +
+            "  temp6 = filter temp5 by id < 100;" +
+            "  generate flatten(temp6) as (id:int), a;" +
+            "};" +
+            "D = foreach C generate (" +
+            "  case id % 4" +
+            "    when 0 then true" +
+            "    else false" +
+            "  end" +
+            ") as (check:boolean);" +
+            "store D into 'out' using org.apache.pig.impl.io.TFileStorage();";
+
+        LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query);
+
+        PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine())
+                .compile(lp, null);
+
+        boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+        assertFalse(planFetchable);
+
+    }
+
+    @Test
+    public void test3() throws Exception {
+        File scriptFile = null;
+        try {
+            String[] script = {
+                    "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' ",
+                    "using PigStorage(' ') as (a:int, b: ",
+                    "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});",
+                    "C = foreach A {",
+                    "  temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);",
+                    "  temp2 = filter temp1 by key < 400;",
+                    "  temp3 = limit temp2 3;",
+                    "  temp4 = foreach temp3 generate key-r as (id:int);",
+                    "  temp5 = limit temp4 4;",
+                    "  temp6 = filter temp5 by id < 100;",
+                    "  generate flatten(temp6) as (id:int), a;",
+                    "};",
+                    "D = foreach C generate (",
+                    "  case id % 4",
+                    "    when 0 then true",
+                    "    else false",
+                    "  end",
+                    ") as (check:boolean);"
+            };
+
+            scriptFile = Util.createLocalInputFile( "testFetchTest3.pig", script);
+            pigServer.registerScript(scriptFile.getAbsolutePath());
+
+            Iterator<Tuple> it = pigServer.openIterator("D");
+            while (it.hasNext()) {
+                assertEquals(false, it.next().get(0));
+                assertEquals(true, it.next().get(0));
+            }
+        }
+        finally {
+            if (scriptFile != null) {
+                scriptFile.delete();
+            }
+        }
+    }
+
+    @Test
+    public void test4() throws Exception {
+        File scriptFile = null;
+        try {
+            String[] script = {
+                "A = load '"+Util.encodeEscape(inputFile2.getAbsolutePath()) +"' ",
+                     "using PigStorage(' ') as (a:int, b:int, c:chararray);",
+                "B = limit A 2;",
+                "C = limit A 1;",
+                "D = union A,B,C;" //introduces an implicit split operator
+            };
+
+            scriptFile = Util.createLocalInputFile( "testFetchTest4.pig", script);
+            pigServer.registerScript(scriptFile.getAbsolutePath());
+            pigServer.setBatchOn();
+
+            LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer);
+            PhysicalPlan pp = ((MRExecutionEngine)
+                    pigServer.getPigContext().getExecutionEngine()).compile(lp, null);
+            boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+            assertFalse(planFetchable);
+
+        }
+        finally {
+            if (scriptFile != null) {
+                scriptFile.delete();
+            }
+        }
+    }
+
+    @Test
+    public void test5() throws Exception {
+
+        File scriptFile = null;
+        try {
+            String[] script = {
+                "A = load '"+Util.encodeEscape(inputFile2.getAbsolutePath()) +"' ",
+                "using PigStorage(' ') as (a:int, b:int, c:chararray);",
+                "B = group A by a;"
+            };
+
+            scriptFile = Util.createLocalInputFile( "testFetchTest5.pig", script);
+            pigServer.registerScript(scriptFile.getAbsolutePath());
+            pigServer.setBatchOn();
+
+            LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer);
+            PhysicalPlan pp = ((MRExecutionEngine)
+                    pigServer.getPigContext().getExecutionEngine()).compile(lp, null);
+            boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+            assertFalse(planFetchable);
+
+        }
+        finally {
+            if (scriptFile != null) {
+                scriptFile.delete();
+            }
+        }
+    }
+
+    @Test
+    public void test6() throws Exception {
+        PigContext pc = pigServer.getPigContext();
+
+        PhysicalPlan pp = new PhysicalPlan();
+        POLoad poLoad = GenPhyOp.topLoadOp();
+        pp.add(poLoad);
+        POLimit poLimit = new POLimit(new OperatorKey("", r.nextLong()), -1, null);
+        pp.add(poLimit);
+        pp.connect(poLoad, poLimit);
+        POStore poStore = GenPhyOp.topStoreOp();
+        pp.addAsLeaf(poStore);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        new FetchLauncher(pc).explain(pp, pc, ps, "xml");
+        assertTrue(baos.toString().matches("(?si).*No MR jobs. Fetch only.*"));
+
+    }
+
+    @AfterClass
+    public static void tearDownOnce() throws Exception {
+        inputFile1.delete();
+        inputFile2.delete();
+    }
+
+}