You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/03 04:57:26 UTC
svn commit: r1563765 [1/2] - in /pig/trunk: ./ conf/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/fetch/ ...
Author: cheolsoo
Date: Mon Feb 3 03:57:25 2014
New Revision: 1563765
URL: http://svn.apache.org/r1563765
Log:
PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java
pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java
pig/trunk/test/org/apache/pig/test/TestFetch.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
pig/trunk/test/e2e/pig/tests/cmdline.conf
pig/trunk/test/e2e/pig/tests/negative.conf
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/org/apache/pig/test/TestAssert.java
pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb 3 03:57:25 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3642: Direct HDFS access for small jobs (fetch) (lbendig via cheolsoo)
+
PIG-3730: Performance issue in SelfSpillBag (rajesh.balamohan via rohini)
PIG-3654: Add class cache to PigContext (tmwoodruff via daijy)
Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Mon Feb 3 03:57:25 2014
@@ -62,6 +62,7 @@
#pig.skewedjoin.reduce.memusage=0.3
#pig.exec.nocombiner=false
#opt.multiquery=true
+#opt.fetch=true
#Following parameters are for configuring intermediate storage format
#Supported storage types are seqfile and tfile
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java Mon Feb 3 03:57:25 2014
@@ -367,7 +367,8 @@ public class CSVExcelStorage extends Pig
// further records to it. If they are the same (this would
// happen if multiple small files each with a header were combined
// into one split), we know to skip the duplicate header record as well.
- if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER && splitIndex == 0) {
+ if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER &&
+ (splitIndex == 0 || splitIndex == -1)) {
try {
if (!in.nextKeyValue())
return null;
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java Mon Feb 3 03:57:25 2014
@@ -295,7 +295,7 @@ public class FixedWidthLoader extends Lo
@Override
public Tuple getNext() throws IOException {
- if (loadingFirstRecord && skipHeader && splitIndex == 0) {
+ if (loadingFirstRecord && skipHeader && (splitIndex == 0 || splitIndex == -1)) {
try {
if (!reader.nextKeyValue())
return null;
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Mon Feb 3 03:57:25 2014
@@ -208,6 +208,7 @@ public class Main {
opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+ opts.registerOpt('N', "no_fetch", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('P', "propertyFile", CmdLineParser.ValueExpected.REQUIRED);
ExecMode mode = ExecMode.UNKNOWN;
@@ -300,6 +301,10 @@ public class Main {
properties.setProperty("opt.multiquery",""+false);
break;
+ case 'N':
+ properties.setProperty(PigConfiguration.OPT_FETCH,""+false);
+ break;
+
case 'p':
params.add(opts.getValStr());
break;
@@ -863,6 +868,7 @@ public class Main {
System.out.println(" -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.");
System.out.println(" -F, -stop_on_failure - Aborts execution on the first failed job; default is off");
System.out.println(" -M, -no_multiquery - Turn multiquery optimization off; default is on");
+ System.out.println(" -N, -no_fetch - Turn fetch optimization off; default is on");
System.out.println(" -P, -propertyFile - Path to property file");
System.out.println(" -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including");
System.out.println(" any environment variables that are set by the pig command.");
@@ -885,6 +891,8 @@ public class Main {
System.out.println(" Only disable combiner as a temporary workaround for problems.");
System.out.println(" opt.multiquery=true|false; multiquery is on by default.");
System.out.println(" Only disable multiquery as a temporary workaround for problems.");
+ System.out.println(" opt.fetch=true|false; fetch is on by default.");
+ System.out.println(" Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs.");
System.out.println(" pig.tmpfilecompression=true|false; compression is off by default.");
System.out.println(" Determines whether output of intermediate jobs is compressed.");
System.out.println(" pig.tmpfilecompression.codec=lzo|gzip; default is gzip.");
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb 3 03:57:25 2014
@@ -148,5 +148,11 @@ public class PigConfiguration {
* Controls the max threshold size to convert jobs to run in local mode
*/
public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
+
+ /**
+ * This parameter enables/disables fetching. By default it is turned on.
+ */
+ public static final String OPT_FETCH = "opt.fetch";
+
}
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Mon Feb 3 03:57:25 2014
@@ -104,6 +104,7 @@ import org.apache.pig.tools.pigstats.Out
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.SimpleFetchPigStats;
/**
*
@@ -418,6 +419,12 @@ public class PigServer {
*/
protected List<ExecJob> getJobs(PigStats stats) {
LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
+ if (stats instanceof SimpleFetchPigStats) {
+ HJob job = new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, stats.result(null)
+ .getPOStore(), null);
+ jobs.add(job);
+ return jobs;
+ }
JobGraph jGraph = stats.getJobGraph();
Iterator<JobStats> iter = jGraph.iterator();
while (iter.hasNext()) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Feb 3 03:57:25 2014
@@ -40,7 +40,8 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -355,6 +356,13 @@ public abstract class HExecutionEngine i
try {
PhysicalPlan pp = compile(lp, pc.getProperties());
+ //if the compiled physical plan fulfills the requirements of the
+ //fetch optimizer, then further transformations / MR jobs creations are
+ //skipped; a SimpleFetchPigStats will be returned through which the result
+ //can be directly fetched from the underlying storage
+ if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+ return new FetchLauncher(pc).launchPig(pp);
+ }
return launcher.launchPig(pp, grpName, pigContext);
} catch (ExecException e) {
throw (ExecException) e;
@@ -385,6 +393,10 @@ public abstract class HExecutionEngine i
pp.explain(pps, format, verbose);
MapRedUtil.checkLeafIsStore(pp, pigContext);
+ if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+ new FetchLauncher(pigContext).explain(pp, pc, eps, format);
+ return;
+ }
launcher.explain(pp, pigContext, eps, format, verbose);
} finally {
launcher.reset();
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Feb 3 03:57:25 2014
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.SimpleFetchPigStats;
+import org.joda.time.DateTimeZone;
+
+/**
+ * This class is responsible for executing the fetch task, saving the result to disk
+ * and do the necessary cleanup afterwards.
+ *
+ */
+public class FetchLauncher {
+
+ private final PigContext pigContext;
+ private final Configuration conf;
+
+ public FetchLauncher(PigContext pigContext) {
+ this.pigContext = pigContext;
+ this.conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
+ }
+
+ /**
+ * Runs the fetch task by executing chain of calls on the PhysicalPlan from the leaf
+ * up to the LoadFunc
+ *
+ * @param pp - Physical plan
+ * @return SimpleFetchPigStats instance representing the fetched result
+ * @throws IOException
+ */
+ public PigStats launchPig(PhysicalPlan pp) throws IOException {
+ POStore poStore = (POStore) pp.getLeaves().get(0);
+ init(pp, poStore);
+
+ // run fetch
+ runPipeline(poStore);
+
+ UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
+ new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
+ udfFinisher.visit();
+
+ return PigStats.start(new SimpleFetchPigStats(pigContext, poStore));
+ }
+
+ /**
+ * Creates an empty MR plan
+ *
+ * @param pp - Physical plan
+ * @param pc - PigContext
+ * @param ps - PrintStream to write the plan to
+ * @param format format of the output plan
+ * @throws PlanException
+ * @throws VisitorException
+ * @throws IOException
+ */
+ public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps, String format)
+ throws PlanException, VisitorException, IOException {
+ if ("xml".equals(format)) {
+ ps.println("<mapReducePlan>No MR jobs. Fetch only</mapReducePlan>");
+ }
+ else {
+ ps.println("#--------------------------------------------------");
+ ps.println("# Map Reduce Plan ");
+ ps.println("#--------------------------------------------------");
+ ps.println("No MR jobs. Fetch only.");
+ }
+ return;
+ }
+
+ private void init(PhysicalPlan pp, POStore poStore) throws IOException {
+
+ poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
+ poStore.setUp();
+ if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
+ MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
+ }
+
+ PhysicalOperator.setReporter(new FetchProgressableReporter());
+ SchemaTupleBackend.initialize(conf, pigContext);
+
+ UDFContext udfContext = UDFContext.getUDFContext();
+ udfContext.addJobConf(conf);
+ udfContext.setClientSystemProps(pigContext.getProperties());
+ udfContext.serialize(conf);
+
+ PigMapReduce.sJobConfInternal.set(conf);
+ String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
+ if (dtzStr != null && dtzStr.length() > 0) {
+ // ensure that the internal timezone is uniformly in UTC offset style
+ DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
+ }
+ }
+
+ private void runPipeline(POStore posStore) throws IOException {
+ while (true) {
+ Result res = posStore.getNextTuple();
+ if (res.returnStatus == POStatus.STATUS_OK)
+ continue;
+
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ posStore.tearDown();
+ return;
+ }
+
+ if (res.returnStatus == POStatus.STATUS_NULL)
+ continue;
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ String errMsg;
+ if(res.result != null) {
+ errMsg = "Fetch failed. Couldn't retrieve result: " + res.result;
+ } else {
+ errMsg = "Fetch failed. Couldn't retrieve result";
+ }
+ int errCode = 2088;
+ ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
+ throw ee;
+ }
+ }
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Mon Feb 3 03:57:25 2014
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * FetchOptimizer determines whether the entire physical plan is fetchable, meaning
+ * that the task's result can be directly read (fetched) from the underlying storage
+ * rather than creating MR jobs. During the check {@link FetchablePlanVisitor} is used
+ * to walk through the plan.
+ *
+ */
+public class FetchOptimizer {
+ private static final Log LOG = LogFactory.getLog(FetchOptimizer.class);
+
+ /**
+ * Checks whether the fetch is enabled
+ *
+ * @param pc
+ * @return true if fetching is enabled
+ */
+ public static boolean isFetchEnabled(PigContext pc) {
+ return "true".equalsIgnoreCase(
+ pc.getProperties().getProperty(PigConfiguration.OPT_FETCH, "true"));
+ }
+
+ /**
+ * Visits the plan with {@link FetchablePlanVisitor} and checks whether the
+ * plan is fetchable.
+ *
+ * @param pc PigContext
+ * @param pp the physical plan to be examined
+ * @return true if the plan is fetchable
+ * @throws VisitorException
+ */
+ public static boolean isPlanFetchable(PigContext pc, PhysicalPlan pp) throws VisitorException {
+ if (isEligible(pc, pp)) {
+ FetchablePlanVisitor fpv = new FetchablePlanVisitor(pc, pp);
+ fpv.visit();
+ boolean isFetchable = fpv.isPlanFetchable();
+ //initialization
+ if (isFetchable)
+ init(pp);
+ return isFetchable;
+ }
+ return false;
+ }
+
+ private static void init(PhysicalPlan pp) throws VisitorException {
+ //mark POStream ops 'fetchable'
+ LinkedList<POStream> posList = PlanHelper.getPhysicalOperators(pp, POStream.class);
+ for (POStream pos : posList) {
+ pos.setFetchable(true);
+ }
+ }
+
+ /**
+ * Checks whether the plan fulfills the prerequisites needed for fetching.
+ *
+ * @param pc PigContext
+ * @param pp the physical plan to be examined
+ * @return
+ */
+ private static boolean isEligible(PigContext pc, PhysicalPlan pp) {
+ if (!isFetchEnabled(pc)) {
+ return false;
+ }
+
+ List<PhysicalOperator> roots = pp.getRoots();
+ for (PhysicalOperator po : roots) {
+ if (!(po instanceof POLoad)) {
+ String msg = "Expected physical operator at root is POLoad. Found : "
+ + po.getClass().getCanonicalName() + ". Fetch optimizer will be disabled.";
+ LOG.debug(msg);
+ return false;
+ }
+ }
+
+ //consider single leaf jobs only
+ int leafSize = pp.getLeaves().size();
+ if (pp.getLeaves().size() != 1) {
+ LOG.debug("Expected physical plan should have one leaf. Found " + leafSize);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * A plan is considered 'fetchable' if:
+ * <pre>
+ * - it contains only: LIMIT, FILTER, FOREACH, STREAM, UNION(no implicit SPLIT is allowed)
+ * - no STORE
+ * - no scalar aliases ({@link org.apache.pig.impl.builtin.ReadScalars ReadScalars})
+ * - {@link org.apache.pig.LoadFunc LoadFunc} is not a {@link org.apache.pig.impl.builtin.SampleLoader SampleLoader}
+ * </pre>
+ */
+ private static class FetchablePlanVisitor extends PhyPlanVisitor {
+
+ private boolean planFetchable = true;
+ private PigContext pc;
+
+ public FetchablePlanVisitor(PigContext pc, PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ this.pc = pc;
+ }
+
+ @Override
+ public void visit() throws VisitorException {
+ new PhyPlanSetter(mPlan).visit();
+ super.visit();
+ }
+
+ @Override
+ public void visitLoad(POLoad ld) throws VisitorException{
+ if (ld.getLoadFunc() instanceof SampleLoader) {
+ planFetchable = false;
+ }
+ }
+
+ @Override
+ public void visitStore(POStore st) throws VisitorException{
+ String basePathName = st.getSFile().getFileName();
+
+ //plan is fetchable if POStore belongs to EXPLAIN
+ if ("fakefile".equals(basePathName)) {
+ return;
+ }
+
+ //Otherwise check if target storage format equals to the intermediate storage format
+ //and its path points to a temporary storage path
+ boolean hasTmpStorageClass = st.getStoreFunc().getClass()
+ .equals(Utils.getTmpFileStorageClass(pc.getProperties()));
+
+ try {
+ boolean hasTmpTargetPath = isTempPath(basePathName);
+ if (!(hasTmpStorageClass && hasTmpTargetPath)) {
+ planFetchable = false;
+ }
+ }
+ catch (IOException e) {
+ String msg = "Internal error. Could not retrieve temporary store location.";
+ throw new VisitorException(msg, e);
+ }
+ }
+
+ @Override
+ public void visitNative(PONative nat) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitPackage(POPackage pkg) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitSplit(POSplit spl) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitDemux(PODemux demux) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitCounter(POCounter poCounter) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitRank(PORank rank) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitDistinct(PODistinct distinct) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitSort(POSort sort) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitCross(POCross cross) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach)
+ throws VisitorException {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitPreCombinerLocalRearrange(
+ POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+ planFetchable = false;
+ }
+
+ @Override
+ public void visitPartialAgg(POPartialAgg poPartialAgg) {
+ planFetchable = false;
+ }
+
+ private boolean isPlanFetchable() {
+ return planFetchable;
+ }
+
+ private boolean isTempPath(String basePathName) throws DataStorageException {
+ String tdir = pc.getProperties().getProperty("pig.temp.dir", "/tmp");
+ String tempStore = pc.getDfs().asContainer(tdir + "/temp").toString();
+ Matcher matcher = Pattern.compile(tempStore + "-?[0-9]+").matcher(basePathName);
+ return matcher.lookingAt();
+ }
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Mon Feb 3 03:57:25 2014
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * This class is used to have a POStore write the output to the underlying storage
+ * via a output collector/record writer in case of a fetch task. It sets up dummy context
+ * objects which otherwise would be initialized by the Hadoop job itself.
+ */
+public class FetchPOStoreImpl extends POStoreImpl {
+
+ private PigContext pc;
+ private RecordWriter<?, ?> writer;
+ private TaskAttemptContext context;
+ private OutputCommitter outputCommitter;
+
+ public FetchPOStoreImpl(PigContext pc) {
+ this.pc = pc;
+ }
+
+ @Override
+ public StoreFuncInterface createStoreFunc(POStore store) throws IOException {
+
+ Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+ StoreFuncInterface storeFunc = store.getStoreFunc();
+ JobContext jc = HadoopShims.createJobContext(conf, new JobID());
+
+ OutputFormat<?, ?> outputFormat = storeFunc.getOutputFormat();
+ PigOutputFormat.setLocation(jc, store);
+ context = HadoopShims.createTaskAttemptContext(conf, HadoopShims.getNewTaskAttemptID());
+ PigOutputFormat.setLocation(context, store);
+
+ try {
+ outputFormat.checkOutputSpecs(jc);
+ }
+ catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ outputCommitter = outputFormat.getOutputCommitter(context);
+ outputCommitter.setupJob(jc);
+ outputCommitter.setupTask(context);
+ writer = outputFormat.getRecordWriter(context);
+ }
+ catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ storeFunc.prepareToWrite(writer);
+ return storeFunc;
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ if (writer != null) {
+ try {
+ writer.close(context);
+ }
+ catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ writer = null;
+ }
+ if (outputCommitter.needsTaskCommit(context))
+ outputCommitter.commitTask(context);
+ HadoopShims.commitOrCleanup(outputCommitter, context);
+ }
+
+ @Override
+ public void cleanUp() throws IOException {
+ if (writer != null) {
+ try {
+ writer.close(context);
+ }
+ catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ writer = null;
+ }
+ HadoopShims.commitOrCleanup(outputCommitter, context);
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchProgressableReporter.java Mon Feb 3 03:57:25 2014
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+
+/**
+ * A dummy ProgressableReporter used for fetch tasks
+ *
+ */
+public class FetchProgressableReporter implements PigProgressable {
+
+ private static final Log LOG = LogFactory.getLog(FetchProgressableReporter.class);
+
+ public void progress() {
+
+ }
+
+ public void progress(String msg) {
+ LOG.info(msg);
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Feb 3 03:57:25 2014
@@ -685,27 +685,13 @@ public class JobControlCompiler{
if(!pigContext.inIllustrator)
mro.reducePlan.remove(st);
}
-
- // set out filespecs
- String outputPathString = st.getSFile().getFileName();
- if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
- conf.set("pig.streaming.log.dir",
- new Path(outputPathString, LOG_DIR).toString());
- } else {
- String tmpLocationStr = FileLocalizer
- .getTemporaryPath(pigContext).toString();
- tmpLocation = new Path(tmpLocationStr);
- conf.set("pig.streaming.log.dir",
- new Path(tmpLocation, LOG_DIR).toString());
- }
- conf.set("pig.streaming.task.output.dir", outputPathString);
+
+ MapRedUtil.setupStreamingDirsConfSingle(st, pigContext, conf);
}
else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
log.info("Setting up multi store job");
- String tmpLocationStr = FileLocalizer
- .getTemporaryPath(pigContext).toString();
- tmpLocation = new Path(tmpLocationStr);
-
+ MapRedUtil.setupStreamingDirsConfMulti(pigContext, conf);
+
nwJob.setOutputFormatClass(PigOutputFormat.class);
boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
@@ -718,10 +704,6 @@ public class JobControlCompiler{
sto.setMultiStore(true);
sto.setIndex(idx++);
}
-
- conf.set("pig.streaming.log.dir",
- new Path(tmpLocation, LOG_DIR).toString());
- conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
}
// store map key type
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb 3 03:57:25 2014
@@ -125,11 +125,7 @@ public class POUserFunc extends Expressi
private void instantiateFunc(FuncSpec fSpec) {
this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
this.setSignature(signature);
- Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
- Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
-
- if(tmpS!=null)
- this.func.setInputSchema(tmpS);
+ this.setFuncInputSchema(signature);
if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
executor = new MonitoredUDFExecutor(func);
}
@@ -604,4 +600,17 @@ public class POUserFunc extends Expressi
this.func.setUDFContextSignature(signature);
}
}
+
+ /**
+ * Sets EvalFunc's inputschema based on the signature
+ * @param signature
+ */
+ public void setFuncInputSchema(String signature) {
+ Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
+ Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
+ if(tmpS!=null) {
+ this.func.setInputSchema(tmpS);
+ }
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Feb 3 03:57:25 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
@@ -32,9 +31,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -62,6 +59,14 @@ public class POStream extends PhysicalOp
protected boolean allOutputFromBinaryProcessed = false;
+ /**
+ * This flag indicates whether streaming is done through fetching. If set,
+ * {@link FetchLauncher} pulls out the data from the pipeline. Therefore we need to
+ * skip the case in {@link #getNextTuple()} which is called by map() or reduce() when
+ * processing the next tuple.
+ */
+ private boolean isFetchable;
+
public POStream(OperatorKey k, ExecutableManager executableManager,
StreamingCommand command, Properties properties) {
super(k);
@@ -170,7 +175,7 @@ public class POStream extends PhysicalOp
// if we are here, we haven't consumed all input to be sent
// to the streaming binary - check if we are being called
// from close() on the map or reduce
- if(this.parentPlan.endOfAllInput) {
+ if(isFetchable || this.parentPlan.endOfAllInput) {
Result r = getNextHelper((Tuple)null);
if(r.returnStatus == POStatus.STATUS_EOP) {
// we have now seen *ALL* possible input
@@ -373,4 +378,19 @@ public class POStream extends PhysicalOp
}
return (Tuple) out;
}
+
+ /**
+ * @return true if streaming is done through fetching
+ */
+ public boolean isFetchable() {
+ return isFetchable;
+ }
+
+ /**
+ * @param isFetchable - whether fetching is applied on POStream
+ */
+ public void setFetchable(boolean isFetchable) {
+ this.isFetchable = isFetchable;
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Feb 3 03:57:25 2014
@@ -41,6 +41,7 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -174,6 +175,48 @@ public class MapRedUtil {
}
}
+ /**
+ * Sets up output and log dir paths for a single-store streaming job
+ *
+ * @param st - POStore of the current job
+ * @param pigContext
+ * @param conf
+ * @throws IOException
+ */
+ public static void setupStreamingDirsConfSingle(POStore st, PigContext pigContext,
+ Configuration conf) throws IOException {
+ // set out filespecs
+ String outputPathString = st.getSFile().getFileName();
+ if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
+ conf.set("pig.streaming.log.dir",
+ new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
+ }
+ else {
+ String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+ Path tmpLocation = new Path(tmpLocationStr);
+ conf.set("pig.streaming.log.dir",
+ new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+ }
+ conf.set("pig.streaming.task.output.dir", outputPathString);
+ }
+
+ /**
+ * Sets up output and log dir paths for a multi-store streaming job
+ *
+ * @param pigContext
+ * @param conf
+ * @throws IOException
+ */
+ public static void setupStreamingDirsConfMulti(PigContext pigContext, Configuration conf)
+ throws IOException {
+
+ String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+ Path tmpLocation = new Path(tmpLocationStr);
+ conf.set("pig.streaming.log.dir",
+ new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+ conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
+ }
+
public static FileSpec checkLeafIsStore(
PhysicalPlan plan,
PigContext pigContext) throws ExecException {
Modified: pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java Mon Feb 3 03:57:25 2014
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
public class PropertiesUtil {
private static final String DEFAULT_PROPERTIES_FILE = "/pig-default.properties";
@@ -143,6 +144,11 @@ public class PropertiesUtil {
//by default we keep going on error on the backend
properties.setProperty("stop.on.failure", ""+false);
}
+
+ if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) {
+ //by default fetch optimization is on
+ properties.setProperty(PigConfiguration.OPT_FETCH, ""+true);
+ }
}
/**
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Feb 3 03:57:25 2014
@@ -25,9 +25,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.SequenceInputStream;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketImplFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
@@ -334,7 +331,7 @@ public class Utils {
}
public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
- Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorage(ConfigurationUtil.toProperties(conf)).getStorageClass();
+ Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorageClass(ConfigurationUtil.toProperties(conf));
try {
return storageClass.newInstance();
} catch (InstantiationException e) {
@@ -344,6 +341,10 @@ public class Utils {
}
}
+ public static Class<? extends FileInputLoadFunc> getTmpFileStorageClass(Properties properties) {
+ return getTmpFileStorage(properties).getStorageClass();
+ }
+
private static TEMPFILE_STORAGE getTmpFileStorage(Properties properties) {
boolean tmpFileCompression = properties.getProperty(
PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "false").equals("true");
Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Mon Feb 3 03:57:25 2014
@@ -506,6 +506,10 @@ public class ExpToPhyTranslationVisitor
.getNextNodeId(DEFAULT_SCOPE)), -1,
null, op.getFuncSpec(), (EvalFunc) f);
((POUserFunc)p).setSignature(op.getSignature());
+ //reinitialize input schema from signature
+ if (((POUserFunc)p).getFunc().getInputSchema() == null) {
+ ((POUserFunc)p).setFuncInputSchema(op.getSignature());
+ }
List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
if (cacheFiles != null) {
((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));
Added: pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/SimpleFetchPigStats.java Mon Feb 3 03:57:25 2014
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * SimpleFetchPigStats encapsulates dummy statistics of a fetch task, since during a fetch
+ * no MR jobs are executed
+ *
+ */
+public class SimpleFetchPigStats extends PigStats {
+
+ private final List<OutputStats> outputStatsList;
+ private final List<InputStats> inputStatsList;
+ private final JobGraph emptyJobPlan = new JobGraph();
+
+ public SimpleFetchPigStats(PigContext pigContext, POStore poStore) {
+
+ super.pigContext = pigContext;
+ super.startTime = super.endTime = System.currentTimeMillis();
+ super.userId = System.getProperty("user.name");
+
+ Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
+
+ //initalize empty stats
+ OutputStats os = new OutputStats(null, -1, -1, true);
+ os.setConf(conf);
+ os.setPOStore(poStore);
+ this.outputStatsList = Collections.unmodifiableList(Arrays.asList(os));
+
+ InputStats is = new InputStats(null, -1, -1, true);
+ is.setConf(conf);
+ this.inputStatsList = Collections.unmodifiableList(Arrays.asList(is));
+
+ }
+
+ @Override
+ public JobClient getJobClient() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmbedded() {
+ return false;
+ }
+
+ @Override
+ public Map<String, List<PigStats>> getAllStats() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> getAllErrorMessages() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JobGraph getJobGraph() {
+ return emptyJobPlan;
+ }
+
+ @Override
+ public List<String> getOutputLocations() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> getOutputNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public long getNumberBytes(String location) {
+ return -1L;
+ }
+
+ @Override
+ public long getNumberRecords(String location) {
+ return -1L;
+ }
+
+ @Override
+ public String getOutputAlias(String location) {
+ return null;
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ return 0L;
+ }
+
+ @Override
+ public long getProactiveSpillCountRecords() {
+ return 0L;
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ return 0L;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return 0L;
+ }
+
+ @Override
+ public long getRecordWritten() {
+ return 0L;
+ }
+
+ @Override
+ public int getNumberJobs() {
+ return 0;
+ }
+
+ @Override
+ public List<OutputStats> getOutputStats() {
+ return outputStatsList;
+ }
+
+ @Override
+ public OutputStats result(String alias) {
+ return outputStatsList.get(0);
+ }
+
+ @Override
+ public List<InputStats> getInputStats() {
+ return inputStatsList;
+ }
+
+ @Override
+ public void setBackendException(String jobId, Exception e) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void start() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void stop() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberSuccessfulJobs() {
+ return -1;
+ }
+
+ @Override
+ public int getNumberFailedJobs() {
+ return -1;
+ }
+
+}
Modified: pig/trunk/test/e2e/pig/tests/cmdline.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/cmdline.conf?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/cmdline.conf (original)
+++ pig/trunk/test/e2e/pig/tests/cmdline.conf Mon Feb 3 03:57:25 2014
@@ -68,6 +68,7 @@ describe A;\,
# #JIRA[PIG-373]
# {
# 'num' => 4,
+# 'java_params' => ['-Dopt.fetch=false'],
# 'pig' => q\
#A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray);
#describe A;
@@ -240,6 +241,7 @@ describe D;\,
{
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
A = load ':INPATH:/singlefile/unicode100' as (name:chararray);
dump A;\,
Modified: pig/trunk/test/e2e/pig/tests/negative.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/negative.conf?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/negative.conf (original)
+++ pig/trunk/test/e2e/pig/tests/negative.conf Mon Feb 3 03:57:25 2014
@@ -34,6 +34,7 @@ $cfg = {
'tests' => [
{
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
b = group a by name;
@@ -48,6 +49,7 @@ dump c;\,
'tests' => [
{
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\a = load '/user/gates/nosuchfile'; dump a;\,
'expected_err_regex' => "ERROR 2118: Input path does not exist",
},
@@ -248,6 +250,7 @@ store a into ':INPATH:/singlefile/fileex
{
# missing quotes around command
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $2, $1, $0;
@@ -259,6 +262,7 @@ dump C;#,
{
# input spec missing parenthesis
'num' => 2,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define CMD `perl PigStreaming.pl foo -` input 'foo' using PigStorage() ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -269,6 +273,7 @@ dump B;#,
{
# no serializer name after using
'num' => 3,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define CMD `perl PigStreaming.pl foo -` output ('foo' using );
A = load ':INPATH:/singlefile/studenttab10k';
@@ -279,6 +284,7 @@ dump B;#,
{
# alias name missing from define
'num' => 4,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define `perl PigStreaming.pl foo -`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -289,6 +295,7 @@ dump B;#,
{
# quotes missing from name of the file in ship script
'num' => 5,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define CMD `perl PigStreaming.pl foo -` ship(:SCRIPTHOMEPATH:/PigStreaming.pl);
A = load ':INPATH:/singlefile/studenttab10k';
@@ -306,6 +313,7 @@ dump B;#,
# Define uses using non-existent command (autoship)
'num' => 1,
'execonly' => 'mapred',
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreamingNotThere.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -316,6 +324,7 @@ dump B;\,
{
# Define uses non-existent command with ship clause
'num' => 2,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreamingNotThere.pl foo -` ship(':SCRIPTHOMEPATH:/PigStreamingNotThere.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -339,6 +348,7 @@ dump E;\,
{
# Define uses non-existent serializer
'num' => 4,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl foo -` input('foo' using SerializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -349,6 +359,7 @@ dump B;\,
{
# Define uses non-existent deserializer
'num' => 5,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl` output(stdout using DeserializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -359,6 +370,7 @@ dump B;\,
{
# Invalid skip path
'num' => 6,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
set stream.skippath 'foo';
define CMD `perl PigStreaming.pl`;
@@ -370,6 +382,7 @@ dump B;\,
{
# Invalid command alias in stream operator
'num' => 7,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -380,6 +393,7 @@ dump B;\,
{
# Invalid operator alias in stream operator
'num' => 8,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -483,6 +497,7 @@ store D into ':OUTPATH:';\,
# Define uses using non-existent command
'num' => 1,
'execonly' => 'local',
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreamingNotThere.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Feb 3 03:57:25 2014
@@ -4499,9 +4499,9 @@ store C into ':OUTPATH:';\,
},
{
# Test Union using merge with incompatible types. float->bytearray and chararray->bytearray
- 'num' => 8,
- 'delimiter' => ' ',
- 'pig' => q\
+ 'num' => 8,
+ 'delimiter' => ' ',
+ 'pig' => q\
A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int);
B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:chararray);
C = union onschema A, B;
@@ -4511,17 +4511,18 @@ A = load ':INPATH:/singlefile/studenttab
B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray);
C = union A, B;
store C into ':OUTPATH:';\,
- }
- ]
+ }
+ ]
},
- {
+ {
- # Test Union using merge with Simple data types
- 'name' => 'UdfDistributedCache',
+ # Test Union using merge with Simple data types
+ 'name' => 'UdfDistributedCache',
'tests' => [
- {
- 'num' => 1,
+ {
+ 'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'execonly' => 'mapred', # since distributed cache is not supported in local mode
'pig' => q?
register :FUNCPATH:/testudf.jar;
@@ -4531,8 +4532,8 @@ store C into ':OUTPATH:';\,
c = foreach b generate udfdc(age);
dump c;?,
'expected_out_regex' => ":UdfDistributedCache_1_out:",
- },
- ]
+ },
+ ]
}, {
'name' => 'MonitoredUDF',
'tests' => [
Modified: pig/trunk/test/org/apache/pig/test/TestAssert.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAssert.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAssert.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAssert.java Mon Feb 3 03:57:25 2014
@@ -22,10 +22,12 @@ import static org.apache.pig.builtin.moc
import static org.apache.pig.builtin.mock.Storage.tuple;
import java.util.List;
+import java.util.Properties;
import junit.framework.Assert;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
@@ -84,9 +86,36 @@ public class TestAssert {
try {
pigServer.openIterator("A");
} catch (FrontendException fe) {
+ Assert.assertTrue(fe.getCause().getCause().getMessage().contains("Assertion violated"));
+ }
+ }
+
+ /**
+ * Verify that ASSERT operator works. Disable fetch for this testcase.
+ * @throws Exception
+ */
+ @Test
+ public void testNegativeWithoutFetch() throws Exception {
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ Data data = resetData(pigServer);
+
+ data.set("foo",
+ tuple(1),
+ tuple(2),
+ tuple(3)
+ );
+
+ pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (i:int);");
+ pigServer.registerQuery("ASSERT A BY i > 1 , 'i should be greater than 1';");
+
+ Properties props = pigServer.getPigContext().getProperties();
+ props.setProperty(PigConfiguration.OPT_FETCH, "false");
+ try {
+ pigServer.openIterator("A");
+ } catch (FrontendException fe) {
Assert.assertTrue(fe.getCause().getMessage().contains(
"Job terminated with anomalous status FAILED"));
}
-
}
+
}
Modified: pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java Mon Feb 3 03:57:25 2014
@@ -78,6 +78,7 @@ public class TestAutoLocalMode {
@Before
public void setUp() throws Exception{
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.OPT_FETCH, "false");
pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, String.valueOf("true"));
pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, "200");
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1563765&r1=1563764&r2=1563765&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Feb 3 03:57:25 2014
@@ -29,10 +29,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.BinStorage;
@@ -1011,17 +1013,22 @@ public class TestEvalPipeline2 {
Iterator<Tuple> iter = pigServer.openIterator("e");
+ Map<Object, Object> expected = new HashMap<Object, Object>(3);
+ expected.put(1, null);
+ expected.put(2, null);
+ expected.put(4, null);
+
Tuple t = iter.next();
Assert.assertTrue(t.size()==1);
- Assert.assertTrue((Integer)t.get(0)==1);
+ Assert.assertTrue(expected.containsKey(t.get(0)));
t = iter.next();
Assert.assertTrue(t.size()==1);
- Assert.assertTrue((Integer)t.get(0)==4);
+ Assert.assertTrue(expected.containsKey(t.get(0)));
t = iter.next();
Assert.assertTrue(t.size()==1);
- Assert.assertTrue((Integer)t.get(0)==2);
+ Assert.assertTrue(expected.containsKey(t.get(0)));
Assert.assertFalse(iter.hasNext());
}
@@ -1595,10 +1602,36 @@ public class TestEvalPipeline2 {
pigServer.openIterator("b");
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
+ String message = e.getCause().getCause().getMessage();
+ Assert.assertTrue(message.contains(ArrayList.class.getName()));
}
}
+ // See PIG-1826
+ @Test
+ public void testNonStandardDataWithoutFetch() throws Exception{
+ Properties props = pigServer.getPigContext().getProperties();
+ props.setProperty(PigConfiguration.OPT_FETCH, "false");
+ String[] input1 = {
+ "0",
+ };
+ try {
+ Util.createInputFile(cluster, "table_testNonStandardDataWithoutFetch", input1);
+ pigServer.registerQuery("a = load 'table_testNonStandardDataWithoutFetch' as (a0);");
+ pigServer.registerQuery("b = foreach a generate " + UDFWithNonStandardType.class.getName() + "(a0);");
+
+ try {
+ pigServer.openIterator("b");
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
+ }
+ }
+ finally {
+ props.setProperty(PigConfiguration.OPT_FETCH, "true");
+ }
+ }
+
// See PIG-2078
@Test
public void testProjectNullBag() throws Exception{
Added: pig/trunk/test/org/apache/pig/test/TestFetch.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFetch.java?rev=1563765&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFetch.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestFetch.java Mon Feb 3 03:57:25 2014
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.parser.ParserTestingUtils;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFetch {
+
+ private PigServer pigServer;
+
+ private static File inputFile1;
+ private static File inputFile2;
+
+ private static final long SEED = 1013;
+ private static final Random r = new Random(SEED);
+
+ @BeforeClass
+ public static void setUpOnce() throws Exception {
+
+ String[] data1 = {
+ "1 {(1,2,7,8,b),(1,3,3,5,a)}",
+ "2 {(2,4,6,6,k)}",
+ "3 {(3,7,8,9,p),(3,6,3,1,n)}",
+ "5 {(5,1,1,2,c)}"
+ };
+
+ String[] data2 = {
+ "1 3 a",
+ "1 2 b",
+ "2 4 k",
+ "3 6 n",
+ "3 7 p",
+ "5 1 c"
+ };
+
+ inputFile1 = Util.createInputFile("tmp", "testFetchData1.txt", data1);
+ inputFile2 = Util.createInputFile("tmp", "testFetchData2.txt", data2);
+
+ }
+
+ @Before
+ public void setUp() throws Exception{
+ pigServer = new PigServer(ExecType.LOCAL, new Properties());
+ }
+
+ @Test
+ public void test1() throws Exception {
+ String query =
+ "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' " +
+ "using PigStorage(' ') as (a:int, b: " +
+ "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});" +
+ "C = foreach A {" +
+ " temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);" +
+ " temp2 = filter temp1 by key < 400;" +
+ " temp3 = limit temp2 3;" +
+ " temp4 = foreach temp3 generate key-r as (id:int);" +
+ " temp5 = limit temp4 4;" +
+ " temp6 = filter temp5 by id < 100;" +
+ " generate flatten(temp6) as (id:int), a;" +
+ "};" +
+ "D = foreach C generate (" +
+ " case id % 4" +
+ " when 0 then true" +
+ " else false" +
+ " end" +
+ ") as (check:boolean);";
+
+ LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query);
+
+ PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine())
+ .compile(lp, null);
+
+ boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+ assertTrue(planFetchable);
+
+ }
+
+ @Test
+ public void test2() throws Exception {
+ Properties properties = pigServer.getPigContext().getProperties();
+ properties.setProperty(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "gz");
+ properties.setProperty(PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "true");
+ properties.setProperty(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "tfile");
+
+ String query =
+ "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' " +
+ "using PigStorage(' ') as (a:int, b: " +
+ "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});" +
+ "C = foreach A {" +
+ " temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);" +
+ " temp2 = filter temp1 by key < 400;" +
+ " temp3 = limit temp2 3;" +
+ " temp4 = foreach temp3 generate key-r as (id:int);" +
+ " temp5 = limit temp4 4;" +
+ " temp6 = filter temp5 by id < 100;" +
+ " generate flatten(temp6) as (id:int), a;" +
+ "};" +
+ "D = foreach C generate (" +
+ " case id % 4" +
+ " when 0 then true" +
+ " else false" +
+ " end" +
+ ") as (check:boolean);" +
+ "store D into 'out' using org.apache.pig.impl.io.TFileStorage();";
+
+ LogicalPlan lp = ParserTestingUtils.generateLogicalPlan(query);
+
+ PhysicalPlan pp = ((MRExecutionEngine) pigServer.getPigContext().getExecutionEngine())
+ .compile(lp, null);
+
+ boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+ assertFalse(planFetchable);
+
+ }
+
+ @Test
+ public void test3() throws Exception {
+ File scriptFile = null;
+ try {
+ String[] script = {
+ "A = load '"+Util.encodeEscape(inputFile1.getAbsolutePath()) +"' ",
+ "using PigStorage(' ') as (a:int, b: ",
+ "{t:(t1:int,t2:int,t3:int,t4:int,c:chararray)});",
+ "C = foreach A {",
+ " temp1 = foreach b generate t1*100 as (key:int), ((t2+t3)*10) as (r:int);",
+ " temp2 = filter temp1 by key < 400;",
+ " temp3 = limit temp2 3;",
+ " temp4 = foreach temp3 generate key-r as (id:int);",
+ " temp5 = limit temp4 4;",
+ " temp6 = filter temp5 by id < 100;",
+ " generate flatten(temp6) as (id:int), a;",
+ "};",
+ "D = foreach C generate (",
+ " case id % 4",
+ " when 0 then true",
+ " else false",
+ " end",
+ ") as (check:boolean);"
+ };
+
+ scriptFile = Util.createLocalInputFile( "testFetchTest3.pig", script);
+ pigServer.registerScript(scriptFile.getAbsolutePath());
+
+ Iterator<Tuple> it = pigServer.openIterator("D");
+ while (it.hasNext()) {
+ assertEquals(false, it.next().get(0));
+ assertEquals(true, it.next().get(0));
+ }
+ }
+ finally {
+ if (scriptFile != null) {
+ scriptFile.delete();
+ }
+ }
+ }
+
+ @Test
+ public void test4() throws Exception {
+ File scriptFile = null;
+ try {
+ String[] script = {
+ "A = load '"+Util.encodeEscape(inputFile2.getAbsolutePath()) +"' ",
+ "using PigStorage(' ') as (a:int, b:int, c:chararray);",
+ "B = limit A 2;",
+ "C = limit A 1;",
+ "D = union A,B,C;" //introduces an implicit split operator
+ };
+
+ scriptFile = Util.createLocalInputFile( "testFetchTest4.pig", script);
+ pigServer.registerScript(scriptFile.getAbsolutePath());
+ pigServer.setBatchOn();
+
+ LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer);
+ PhysicalPlan pp = ((MRExecutionEngine)
+ pigServer.getPigContext().getExecutionEngine()).compile(lp, null);
+ boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+ assertFalse(planFetchable);
+
+ }
+ finally {
+ if (scriptFile != null) {
+ scriptFile.delete();
+ }
+ }
+ }
+
+ @Test
+ public void test5() throws Exception {
+
+ File scriptFile = null;
+ try {
+ String[] script = {
+ "A = load '"+Util.encodeEscape(inputFile2.getAbsolutePath()) +"' ",
+ "using PigStorage(' ') as (a:int, b:int, c:chararray);",
+ "B = group A by a;"
+ };
+
+ scriptFile = Util.createLocalInputFile( "testFetchTest5.pig", script);
+ pigServer.registerScript(scriptFile.getAbsolutePath());
+ pigServer.setBatchOn();
+
+ LogicalPlan lp = TestPigStats.getLogicalPlan(pigServer);
+ PhysicalPlan pp = ((MRExecutionEngine)
+ pigServer.getPigContext().getExecutionEngine()).compile(lp, null);
+ boolean planFetchable = FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
+ assertFalse(planFetchable);
+
+ }
+ finally {
+ if (scriptFile != null) {
+ scriptFile.delete();
+ }
+ }
+ }
+
+ @Test
+ public void test6() throws Exception {
+ PigContext pc = pigServer.getPigContext();
+
+ PhysicalPlan pp = new PhysicalPlan();
+ POLoad poLoad = GenPhyOp.topLoadOp();
+ pp.add(poLoad);
+ POLimit poLimit = new POLimit(new OperatorKey("", r.nextLong()), -1, null);
+ pp.add(poLimit);
+ pp.connect(poLoad, poLimit);
+ POStore poStore = GenPhyOp.topStoreOp();
+ pp.addAsLeaf(poStore);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ new FetchLauncher(pc).explain(pp, pc, ps, "xml");
+ assertTrue(baos.toString().matches("(?si).*No MR jobs. Fetch only.*"));
+
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws Exception {
+ inputFile1.delete();
+ inputFile2.delete();
+ }
+
+}