You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/07 02:18:15 UTC
svn commit: r1565509 [1/3] - in /pig/branches/tez: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/executione...
Author: rohini
Date: Fri Feb 7 01:18:14 2014
New Revision: 1565509
URL: http://svn.apache.org/r1565509
Log:
PIG-3748: Support for multiquery off in Tez
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
Modified:
pig/branches/tez/src/org/apache/pig/Main.java
pig/branches/tez/src/org/apache/pig/PigServer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java
pig/branches/tez/test/org/apache/pig/test/TestGrunt.java
pig/branches/tez/test/org/apache/pig/test/TestLoad.java
pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java
pig/branches/tez/test/org/apache/pig/test/TestMultiQueryBasic.java
pig/branches/tez/test/org/apache/pig/test/TestMultiQueryCompiler.java
pig/branches/tez/test/org/apache/pig/test/TestMultiQueryLocal.java
pig/branches/tez/test/org/apache/pig/test/TestPigRunner.java
pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
pig/branches/tez/test/org/apache/pig/test/TestStore.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
Modified: pig/branches/tez/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/Main.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/Main.java (original)
+++ pig/branches/tez/src/org/apache/pig/Main.java Fri Feb 7 01:18:14 2014
@@ -297,7 +297,7 @@ public class Main {
case 'M':
// turns off multiquery optimization
- properties.setProperty("opt.multiquery",""+false);
+ properties.setProperty(PigConfiguration.OPT_MULTIQUERY,""+false);
break;
case 'p':
Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Fri Feb 7 01:18:14 2014
@@ -229,7 +229,8 @@ public class PigServer {
currDAG = new Graph(false);
aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
- isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
+ isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties()
+ .getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
jobName = pigContext.getProperties().getProperty(
PigContext.JOB_NAME,
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 7 01:18:14 2014
@@ -668,7 +668,7 @@ public class MapReduceLauncher extends L
fRem.visit();
boolean isMultiQuery =
- "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.multiquery","true"));
+ "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
if (isMultiQuery) {
// reduces the number of MROpers in the MR plan generated
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri Feb 7 01:18:14 2014
@@ -93,6 +93,10 @@ public class PhyPlanVisitor extends Plan
super(plan, walker);
}
+ public void visit(PhysicalOperator op) {
+ // do nothing
+ }
+
public void visitLoad(POLoad ld) throws VisitorException{
//do nothing
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Fri Feb 7 01:18:14 2014
@@ -168,8 +168,9 @@ public class PlanHelper {
return !foundOps.isEmpty();
}
+ @Override
@SuppressWarnings("unchecked")
- private void visit(PhysicalOperator op) {
+ public void visit(PhysicalOperator op) {
if (opClass.isAssignableFrom(op.getClass())) {
foundOps.add((C) op);
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1565509&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Fri Feb 7 01:18:14 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * POValueInputTez is used read tuples from a Tez Intermediate output from a 1-1
+ * edge
+ */
+public class POValueInputTez extends PhysicalOperator implements TezLoad {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POValueInputTez.class);
+ private String inputKey;
+ // TODO Change this to value only reader after implementing
+ // value only input output
+ private transient KeyValueReader reader;
+
+ public POValueInputTez(OperatorKey k) {
+ super(k);
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf)
+ throws ExecException {
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ try {
+ if (reader.next()) {
+ return new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+ } else {
+ return RESULT_EOP;
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "POValueInputTez - " + mKey.toString() + "\t<-\t " + inputKey;
+ }
+}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1565509&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Fri Feb 7 01:18:14 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+public class POValueOutputTez extends PhysicalOperator implements TezOutput {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POValueOutputTez.class);
+ // TODO Change this to outputKey and write only once
+ // when a shared edge support is available in Tez
+ protected Set<String> outputKeys = new HashSet<String>();
+ // TODO Change this to value only writer after implementing
+ // value only input output
+ protected transient List<KeyValueWriter> writers;
+
+ private static EmptyWritable EMPTY_KEY = new EmptyWritable();
+
+ public POValueOutputTez(OperatorKey k) {
+ super(k);
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ writers = new ArrayList<KeyValueWriter>();
+ for (String outputKey : outputKeys) {
+ LogicalOutput output = outputs.get(outputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + outputKey
+ + " is missing");
+ }
+ try {
+ KeyValueWriter writer = (KeyValueWriter) output.getWriter();
+ writers.add(writer);
+ LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+ }
+
+ public void addOutputKey(String outputKey) {
+ outputKeys.add(outputKey);
+ }
+
+ public void removeOutputKey(String outputKey) {
+ outputKeys.remove(outputKey);
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ Result inp;
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP
+ || inp.returnStatus == POStatus.STATUS_ERR) {
+ break;
+ }
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+ for (KeyValueWriter writer : writers) {
+ try {
+ writer.write(EMPTY_KEY, inp.result);
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+ return RESULT_EMPTY;
+ }
+ return inp;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "POValueOutputTez - " + mKey.toString() + "\t->\t " + outputKeys;
+ }
+
+ public static class EmptyWritable implements Writable {
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+ }
+
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Fri Feb 7 01:18:14 2014
@@ -155,6 +155,10 @@ public class PigProcessor implements Log
for (POIdentityInOutTez identityInOut : identityInOuts){
identityInOut.attachInputs(inputs, conf);
}
+ LinkedList<POValueInputTez> valueInputs = PlanHelper.getPhysicalOperators(execPlan, POValueInputTez.class);
+ for (POValueInputTez input : valueInputs){
+ input.attachInputs(inputs, conf);
+ }
LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
for (POFRJoinTez broadcast : broadcasts){
broadcast.attachInputs(inputs, conf);
@@ -170,6 +174,10 @@ public class PigProcessor implements Log
for (POLocalRearrangeTez lr : rearranges){
lr.attachOutputs(outputs, conf);
}
+ LinkedList<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(execPlan, POValueOutputTez.class);
+ for (POValueOutputTez output : valueOutputs){
+ output.attachOutputs(outputs, conf);
+ }
for (Entry<String, LogicalOutput> entry : outputs.entrySet()){
LogicalOutput logicalOutput = entry.getValue();
if (logicalOutput instanceof MROutput){
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Fri Feb 7 01:18:14 2014
@@ -186,7 +186,7 @@ public class TezCompiler extends PhyPlan
// Segment a single DAG into a DAG graph
public TezPlanContainer getPlanContainer() throws PlanException {
TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
- TezPlanContainerNode node = new TezPlanContainerNode(new OperatorKey(scope, nig.getNextNodeId(scope)), tezPlan);
+ TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope), tezPlan);
tezPlanContainer.add(node);
tezPlanContainer.split(node);
return tezPlanContainer;
@@ -480,7 +480,7 @@ public class TezCompiler extends PhyPlan
* @param op Operator to remove
* @throws VisitorException
*/
- public void removeDupOpTreeOfSplit(TezOperPlan plan, TezOperator op)
+ private void removeDupOpTreeOfSplit(TezOperPlan plan, TezOperator op, boolean isMultiQuery)
throws VisitorException {
Stack<TezOperator> stack = new Stack<TezOperator>();
stack.push(op);
@@ -488,31 +488,47 @@ public class TezCompiler extends PhyPlan
op = stack.pop();
List<TezOperator> predecessors = plan.getPredecessors(op);
if (predecessors != null) {
- for (TezOperator pred : predecessors) {
- List<POSplit> splits = PlanHelper.getPhysicalOperators(
- pred.plan, POSplit.class);
- if (splits.isEmpty()) {
- stack.push(pred);
- } else {
- for (POSplit split : splits) {
- PhysicalPlan planToRemove = null;
- for (PhysicalPlan splitPlan : split.getPlans()) {
- PhysicalOperator phyOp = splitPlan.getLeaves().get(0);
- if (phyOp instanceof POLocalRearrangeTez) {
- POLocalRearrangeTez lr = (POLocalRearrangeTez) phyOp;
- if (lr.getOutputKey().equals(
- op.getOperatorKey().toString())) {
- planToRemove = splitPlan;
- break;
+ if (isMultiQuery) {
+ for (TezOperator pred : predecessors) {
+ if (!pred.isSplitOperator()) {
+ stack.push(pred);
+ } else {
+ List<POSplit> splits = PlanHelper.getPhysicalOperators(
+ pred.plan, POSplit.class);
+ for (POSplit split : splits) {
+ PhysicalPlan planToRemove = null;
+ for (PhysicalPlan splitPlan : split.getPlans()) {
+ PhysicalOperator phyOp = splitPlan
+ .getLeaves().get(0);
+ if (phyOp instanceof POLocalRearrangeTez) {
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) phyOp;
+ if (lr.getOutputKey().equals(
+ op.getOperatorKey().toString())) {
+ planToRemove = splitPlan;
+ break;
+ }
}
}
- }
- if (planToRemove != null) {
- split.getPlans().remove(planToRemove);
- break;
+ if (planToRemove != null) {
+ split.getPlans().remove(planToRemove);
+ break;
+ }
}
}
}
+ } else {
+ for (TezOperator pred : predecessors) {
+ // Remove everything till we encounter another split
+ if (!pred.isSplitOperator()) {
+ stack.push(pred);
+ } else {
+ // If split operator, just remove from the output
+ POValueOutputTez valueOut = (POValueOutputTez)pred.plan.getLeaves().get(0);
+ valueOut.removeOutputKey(op.getOperatorKey().toString());
+ //TODO Handle shared edge when available in Tez
+ pred.outEdges.remove(op.getOperatorKey().toString());
+ }
+ }
}
}
plan.remove(op);
@@ -520,6 +536,42 @@ public class TezCompiler extends PhyPlan
}
/**
+ * In case of mulitple levels of split, after removing duplicate tree we need to reset
+ * input of operators in the old tree as some of the inputs of the PhysicalOperator in
+ * original tree will now be overwritten and referring to operators in
+ * duplicate tree. For eg: POFilter inputs will refer to the duplicate tree's
+ * POValueInputTez even though it is connected to a original split tree's POValueInputTez
+ */
+ private void resetInputsOfPredecessors(TezOperPlan plan, TezOperator op) {
+ Stack<TezOperator> stack = new Stack<TezOperator>();
+ stack.push(op);
+ while (!stack.isEmpty()) {
+ op = stack.pop();
+ List<TezOperator> predecessors = plan.getPredecessors(op);
+ if (predecessors != null) {
+ for (TezOperator pred : predecessors) {
+ resetInputs(pred.plan, pred.plan.getLeaves());
+ if (!pred.isSplitOperator()) {
+ stack.push(pred);
+ }
+ }
+ }
+ }
+ }
+
+ private void resetInputs(PhysicalPlan plan, List<PhysicalOperator> ops) {
+ for (PhysicalOperator op : ops) {
+ List<PhysicalOperator> preds = plan.getPredecessors(op);
+ if (preds != null) {
+ for (PhysicalOperator pred : preds) {
+ pred.setInputs(plan.getPredecessors(pred));
+ resetInputs(plan, plan.getPredecessors(pred));
+ }
+ }
+ }
+ }
+
+ /**
* Merges the TezOperators in the compiledInputs into a single merged
* TezOperator.
*
@@ -686,7 +738,7 @@ public class TezCompiler extends PhyPlan
pkg.getPkgr().setDistinct(true);
plan.addAsLeaf(pkg);
- POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POProject project = new POProject(OperatorKey.genOpKey(scope));
project.setResultType(DataType.TUPLE);
project.setStar(false);
project.setColumn(0);
@@ -699,7 +751,7 @@ public class TezCompiler extends PhyPlan
@Override
public void visitFilter(POFilter op) throws VisitorException {
try {
- if (curTezOp.isSplitSubPlan()) {
+ if (curTezOp.isSplitSubPlan() || curTezOp.getSplitParent() != null) {
// Do not add the filter. Refer NoopFilterRemover.java of MR
PhysicalPlan filterPlan = op.getPlan();
if (filterPlan.size() == 1) {
@@ -841,7 +893,7 @@ public class TezCompiler extends PhyPlan
curTezOp.plan.addAsLeaf(forEach);
if (!pigContext.inIllustrator) {
- POLimit limitCopy = new POLimit(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POLimit limitCopy = new POLimit(OperatorKey.genOpKey(scope));
limitCopy.setAlias(op.getAlias());
limitCopy.setLimit(op.getLimit());
limitCopy.setLimitPlan(op.getLimitPlan());
@@ -1281,7 +1333,7 @@ public class TezCompiler extends PhyPlan
POPackage pkg = getPackage(1, DataType.BYTEARRAY);
curTezOp.plan.add(pkg);
- POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POProject project = new POProject(OperatorKey.genOpKey(scope));
project.setResultType(DataType.BAG);
project.setStar(false);
project.setColumn(1);
@@ -1327,7 +1379,7 @@ public class TezCompiler extends PhyPlan
// parallelism of POPartitionRearrange to -1, so its parallelism
// will be determined by the size of streaming table.
POPartitionRearrangeTez pr =
- new POPartitionRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ new POPartitionRearrangeTez(OperatorKey.genOpKey(scope));
try {
pr.setIndex(1);
} catch (ExecException e) {
@@ -1349,7 +1401,7 @@ public class TezCompiler extends PhyPlan
// Create POGlobalRearrange
POGlobalRearrange gr =
- new POGlobalRearrange(new OperatorKey(scope, nig.getNextNodeId(scope)), rp);
+ new POGlobalRearrange(OperatorKey.genOpKey(scope), rp);
// Skewed join has its own special partitioner
gr.setResultType(DataType.TUPLE);
gr.visit(this);
@@ -1376,7 +1428,7 @@ public class TezCompiler extends PhyPlan
// Add corresponding POProjects
for (int i=0; i < 2; i++) {
ep = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POProject prj = new POProject(OperatorKey.genOpKey(scope));
prj.setColumn(i+1);
prj.setOverloaded(false);
prj.setResultType(DataType.BAG);
@@ -1390,7 +1442,7 @@ public class TezCompiler extends PhyPlan
}
POForEach fe =
- new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, eps, flat);
+ new POForEach(OperatorKey.genOpKey(scope), -1, eps, flat);
fe.setResultType(DataType.TUPLE);
fe.visit(this);
@@ -1832,7 +1884,7 @@ public class TezCompiler extends PhyPlan
}
POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
- new OperatorKey(scope, nig.getNextNodeId(scope)),
+ OperatorKey.genOpKey(scope),
inputOperRearrange);
identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
oper1.plan.addAsLeaf(identityInOutTez);
@@ -1845,7 +1897,7 @@ public class TezCompiler extends PhyPlan
identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
if (limit!=-1) {
- POPackage pkg_c = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
pkg_c.setPkgr(new LitePackager());
pkg_c.getPkgr().setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
pkg_c.setNumInps(1);
@@ -1892,7 +1944,7 @@ public class TezCompiler extends PhyPlan
combinePlan.addAsLeaf(lr_c2);
}
- POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
pkg.setPkgr(new LitePackager());
pkg.getPkgr().setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
pkg.setNumInps(1);
@@ -1937,7 +1989,7 @@ public class TezCompiler extends PhyPlan
throw new PlanException(msg, errCode, PigException.BUG, ve);
}
- POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POLocalRearrangeTez lr = new POLocalRearrangeTez(OperatorKey.genOpKey(scope));
POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields);
@@ -2002,31 +2054,65 @@ public class TezCompiler extends PhyPlan
@Override
public void visitSplit(POSplit op) throws VisitorException {
try {
- if (splitsSeen.containsKey(op.getOperatorKey())) {
- // Since the plan for this split already exists in the tez plan,
- // discard the hierarchy or tez operators we constructed so far
- // till we encountered the split in this tree
- removeDupOpTreeOfSplit(tezPlan, curTezOp);
- curTezOp = startNew(op.getOperatorKey());
- } else {
- nonBlocking(op);
- if(curTezOp.isSplitSubPlan()) {
- // Split followed by another split
- // Set inputs to null as POSplit will attach input to roots
- for (PhysicalOperator root : curTezOp.plan.getRoots()) {
- root.setInputs(null);
+ boolean isMultiQuery = "true".equalsIgnoreCase(pigContext
+ .getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+
+ if (isMultiQuery) {
+ if (splitsSeen.containsKey(op.getOperatorKey())) {
+ // Since the plan for this split already exists in the tez plan,
+ // discard the hierarchy or tez operators we constructed so far
+ // till we encountered the split in this tree
+ removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
+ curTezOp = startNew(op.getOperatorKey());
+ } else {
+ nonBlocking(op);
+ if(curTezOp.isSplitSubPlan()) {
+ // Split followed by another split
+ // Set inputs to null as POSplit will attach input to roots
+ for (PhysicalOperator root : curTezOp.plan.getRoots()) {
+ root.setInputs(null);
+ }
+ TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
+ POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
+ split.addPlan(curTezOp.plan);
+ addSubPlanPropertiesToParent(splitOp, curTezOp);
+ splitsSeen.put(op.getOperatorKey(), splitOp);
+ phyToTezOpMap.put(op, splitOp);
+ } else {
+ curTezOp.setSplitOperator(true);
+ splitsSeen.put(op.getOperatorKey(), curTezOp);
+ phyToTezOpMap.put(op, curTezOp);
}
- TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
- POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
- split.addPlan(curTezOp.plan);
- addSubPlanPropertiesToParent(splitOp, curTezOp);
+ curTezOp = startNew(op.getOperatorKey());
+ }
+ } else {
+ TezOperator splitOp = curTezOp;
+ POValueOutputTez output = null;
+ if (splitsSeen.containsKey(op.getOperatorKey())) {
+ removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
+ splitOp = splitsSeen.get(op.getOperatorKey());
+ resetInputsOfPredecessors(tezPlan, splitOp);
+ output = (POValueOutputTez)splitOp.plan.getLeaves().get(0);
+ } else {
+ splitOp.setSplitOperator(true);
splitsSeen.put(op.getOperatorKey(), splitOp);
phyToTezOpMap.put(op, splitOp);
- } else {
- splitsSeen.put(op.getOperatorKey(), curTezOp);
- phyToTezOpMap.put(op, curTezOp);
+ output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ splitOp.plan.addAsLeaf(output);
}
- curTezOp = startNew(op.getOperatorKey());
+ curTezOp = getTezOp();
+ curTezOp.setSplitParent(splitOp.getOperatorKey());
+ tezPlan.add(curTezOp);
+ output.addOutputKey(curTezOp.getOperatorKey().toString());
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
+ //TODO shared edge once support is available in Tez
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+ edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+ curTezOp.setRequestedParallelismByReference(splitOp);
+ POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+ input.setInputKey(splitOp.getOperatorKey().toString());
+ curTezOp.plan.addAsLeaf(input);
}
} catch (Exception e) {
int errCode = 2034;
@@ -2102,7 +2188,7 @@ public class TezCompiler extends PhyPlan
private POPackage getPackage(int numOfInputs, byte keyType) {
// The default value of boolean is false
boolean[] inner = new boolean[numOfInputs];
- POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
pkg.getPkgr().setInner(inner);
pkg.getPkgr().setKeyType(keyType);
pkg.setNumInps(numOfInputs);
@@ -2110,7 +2196,7 @@ public class TezCompiler extends PhyPlan
}
private TezOperator getTezOp() {
- return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ return new TezOperator(OperatorKey.genOpKey(scope));
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Feb 7 01:18:14 2014
@@ -74,7 +74,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -239,7 +239,19 @@ public class TezDagBuilder extends TezOp
edge.partitionerClass.getName());
}
+ if (from.plan.getLeaves().get(0) instanceof POValueOutputTez) {
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+ POValueOutputTez.EmptyWritable.class.getName());
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+ BinSedesTuple.class.getName());
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ POValueOutputTez.EmptyWritable.class.getName());
+ conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+ BinSedesTuple.class.getName());
+ }
+
MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
+
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Fri Feb 7 01:18:14 2014
@@ -130,5 +130,14 @@ public class TezOperPlan extends Operato
addExtraResource(entry.getKey(), entry.getValue());
}
}
+
+ @Override
+ public void remove(TezOperator op) {
+ //TODO Cleanup outEdges of predecessors and inEdges of successors
+ //TezDAGBuilder would not create the edge. So low priority
+ super.remove(op);
+ }
+
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Fri Feb 7 01:18:14 2014
@@ -52,7 +52,11 @@ public class TezOperator extends Operato
// TODO: We need to specify parallelism per vertex in Tez. For now, we set
// them all to 1.
// Use AtomicInteger for access by reference and being able to reset in
- // TezDAGBuilder based on number of input splits. We just need mutability and not concurrency
+ // TezDAGBuilder based on number of input splits.
+ // We just need mutability and not concurrency
+ // This is to ensure that vertexes with 1-1 edge have same parallelism
+ // even when parallelism of source vertex changes.
+ // Can change to int and set to -1 if TEZ-800 gets fixed.
private AtomicInteger requestedParallelism = new AtomicInteger(-1);
// TODO: When constructing Tez vertex, we have to specify how much resource
@@ -63,9 +67,17 @@ public class TezOperator extends Operato
//int requestedCpu = 1;
// Presence indicates that this TezOper is sub-plan of a POSplit.
+ // This is in-case when multi-query is turned on
// Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
private OperatorKey splitOperatorKey = null;
+ // This indicates that this TezOper has POSplit as a parent.
+ // This is the case where multi-query is turned off.
+ private OperatorKey splitParent = null;
+
+ // This indicates that this TezOper is a split operator
+ private boolean isSplitOper;
+
// Indicates that the plan creation is complete
boolean closed = false;
@@ -171,6 +183,22 @@ public class TezOperator extends Operato
return splitOperatorKey != null;
}
+ public OperatorKey getSplitParent() {
+ return splitParent;
+ }
+
+ public void setSplitParent(OperatorKey splitParent) {
+ this.splitParent = splitParent;
+ }
+
+ public boolean isSplitOperator() {
+ return isSplitOper;
+ }
+
+ public void setSplitOperator(boolean isSplitOperator) {
+ this.isSplitOper = isSplitOperator;
+ }
+
public boolean isClosed() {
return closed;
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java Fri Feb 7 01:18:14 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.impl.io.FileLocalizer;
@@ -37,7 +38,7 @@ public class TestBatchAliases {
@Before
public void setUp() throws Exception {
- System.setProperty("opt.multiquery", ""+true);
+ System.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
myPig = new PigServer(ExecType.LOCAL, new Properties());
deleteOutputFiles();
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestGrunt.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestGrunt.java Fri Feb 7 01:18:14 2014
@@ -41,6 +41,7 @@ import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
@@ -63,7 +64,7 @@ public class TestGrunt {
@BeforeClass
public static void oneTimeSetup() throws Exception {
- cluster.setProperty("opt.multiquery","true");
+ cluster.setProperty(PigConfiguration.OPT_MULTIQUERY,"true");
}
@AfterClass
Modified: pig/branches/tez/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestLoad.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestLoad.java Fri Feb 7 01:18:14 2014
@@ -35,6 +35,7 @@ import java.util.Properties;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -64,18 +65,18 @@ public class TestLoad {
PigContext pc;
PigServer[] servers;
-
+
static MiniCluster cluster = MiniCluster.buildCluster();
-
+
@Before
public void setUp() throws Exception {
FileLocalizer.deleteTempFiles();
- servers = new PigServer[] {
+ servers = new PigServer[] {
new PigServer(ExecType.MAPREDUCE, cluster.getProperties()),
new PigServer(ExecType.LOCAL, new Properties())
- };
+ };
}
-
+
@Test
public void testGetNextTuple() throws IOException {
pc = servers[0].getPigContext();
@@ -88,10 +89,10 @@ public class TestLoad {
POLoad ld = GenPhyOp.topLoadOp();
ld.setLFile(inpFSpec);
ld.setPc(pc);
-
+
DataBag inpDB = DefaultBagFactory.getInstance().newDefaultBag();
BufferedReader br = new BufferedReader(new FileReader("test/org/apache/pig/test/data/InputFiles/passwd"));
-
+
for(String line = br.readLine();line!=null;line=br.readLine()){
String[] flds = line.split(":",-1);
Tuple t = new DefaultTuple();
@@ -113,7 +114,7 @@ public class TestLoad {
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
@Test
public void testLoadRemoteRel() throws Exception {
for (PigServer pig : servers) {
@@ -144,7 +145,7 @@ public class TestLoad {
pc = servers[0].getPigContext();
boolean noConversionExpected = true;
checkLoadPath("hdfs:/tmp/test","hdfs:/tmp/test", noConversionExpected);
-
+
// check if a location 'hdfs:<abs path>' can actually be read using PigStorage
String[] inputFileNames = new String[] {
"/tmp/TestLoad-testLoadRemoteAbsSchema-input.txt"};
@@ -198,18 +199,18 @@ public class TestLoad {
boolean noConversionExpected = true;
checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3",
"hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3", noConversionExpected );
-
- // check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually be
+
+ // check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually be
// read using PigStorage
String[] inputFileNames = new String[] {
"/tmp/TestLoad-testCommaSeparatedString3-input1.txt",
"/tmp/TestLoad-testCommaSeparatedString3-input2.txt"};
- String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" +
+ String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" +
inputFileNames[1];
testLoadingMultipleFiles(inputFileNames, inputString);
-
+
}
-
+
@Test
public void testCommaSeparatedString4() throws Exception {
for (PigServer pig : servers) {
@@ -224,12 +225,12 @@ public class TestLoad {
pc = pig.getPigContext();
checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
}
-
- // check if a location '<abs path>,<relative path>' can actually be
+
+ // check if a location '<abs path>,<relative path>' can actually be
// read using PigStorage
String loadLocationString = "/tmp/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
"TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current working dir is set to /tmp in checkLoadPath()
-
+
String[] inputFileNames = new String[] {
"/tmp/TestLoad-testCommaSeparatedStringMixed-input1.txt",
"/tmp/TestLoad-testCommaSeparatedStringMixed-input2.txt",
@@ -237,7 +238,7 @@ public class TestLoad {
pc = servers[0].getPigContext(); // test in map reduce mode
testLoadingMultipleFiles(inputFileNames, loadLocationString);
}
-
+
@Test
public void testCommaSeparatedString6() throws Exception {
for (PigServer pig : servers) {
@@ -245,7 +246,7 @@ public class TestLoad {
checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
}
}
-
+
@Test
public void testNonDfsLocation() throws Exception {
String nonDfsUrl = "har:///user/foo/f.har";
@@ -256,11 +257,11 @@ public class TestLoad {
nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
assertEquals(nonDfsUrl, load.getFileSpec().getFileName());
}
-
+
@SuppressWarnings("unchecked")
- private void testLoadingMultipleFiles(String[] inputFileNames,
+ private void testLoadingMultipleFiles(String[] inputFileNames,
String loadLocationString) throws IOException, ParserException {
-
+
String[][] inputStrings = new String[][] {
new String[] { "hello\tworld"},
new String[] { "bye\tnow"},
@@ -270,7 +271,7 @@ public class TestLoad {
(Tuple) Util.getPigConstant("('hello', 'world')"),
(Tuple) Util.getPigConstant("('bye', 'now')"),
(Tuple) Util.getPigConstant("('all', 'good')")});
-
+
List<Tuple> expectedBasedOnNumberOfInputs = new ArrayList<Tuple>();
for(int i = 0; i < inputFileNames.length; i++) {
Util.createInputFile(pc, inputFileNames[i], inputStrings[i]);
@@ -280,7 +281,7 @@ public class TestLoad {
servers[0].registerQuery(" a = load '" + loadLocationString + "' as " +
"(s1:chararray, s2:chararray);");
Iterator<Tuple> it = servers[0].openIterator("a");
-
+
List<Tuple> actual = new ArrayList<Tuple>();
while(it.hasNext()) {
actual.add(it.next());
@@ -294,40 +295,40 @@ public class TestLoad {
}
}
}
-
+
private void checkLoadPath(String orig, String expected) throws Exception {
checkLoadPath(orig, expected, false);
}
- private void checkLoadPath(String orig, String expected,
+ private void checkLoadPath(String orig, String expected,
boolean noConversionExpected) throws Exception {
-
+
boolean[] multiquery = {true, false};
-
+
for (boolean b : multiquery) {
- pc.getProperties().setProperty("opt.multiquery", "" + b);
-
+ pc.getProperties().setProperty(PigConfiguration.OPT_MULTIQUERY, "" + b);
+
DataStorage dfs = pc.getDfs();
dfs.setActiveContainer(dfs.asContainer("/tmp"));
Map<String, String> fileNameMap = new HashMap<String, String>();
-
+
QueryParserDriver builder = new QueryParserDriver(pc, "Test-Load", fileNameMap);
-
+
String query = "a = load '"+orig+"';";
LogicalPlan lp = builder.parse(query);
assertTrue(lp.size()>0);
Operator op = lp.getSources().get(0);
-
+
assertTrue(op instanceof LOLoad);
LOLoad load = (LOLoad)op;
-
+
String p = load.getFileSpec().getFileName();
System.err.println("DEBUG: p:" + p + " expected:" + expected +", exectype:" + pc.getExecType());
if(noConversionExpected) {
assertEquals(expected, p);
} else {
String protocol = pc.getExecType() == ExecType.MAPREDUCE ? "hdfs" : "file";
- // regex : A word character, i.e. [a-zA-Z_0-9] or '-' followed by ':' then any characters
+ // regex : A word character, i.e. [a-zA-Z_0-9] or '-' followed by ':' then any characters
String regex = "[\\-\\w:\\.]";
assertTrue(p.matches(".*" + protocol + "://" + regex + "*.*"));
assertEquals(expected, p.replaceAll(protocol + "://" + regex + "*/", "/"));
Modified: pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java Fri Feb 7 01:18:14 2014
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Properties;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.data.Tuple;
@@ -50,17 +51,17 @@ public class TestMultiQuery {
Util.copyFromLocalToLocal(
"test/org/apache/pig/test/data/passwd2", "passwd2");
Properties props = new Properties();
- props.setProperty("opt.multiquery", ""+true);
+ props.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
myPig = new PigServer(ExecType.LOCAL, props);
}
-
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd");
Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd2");
deleteOutputFiles();
}
-
+
@Before
public void setUp() throws Exception {
deleteOutputFiles();
@@ -75,9 +76,9 @@ public class TestMultiQuery {
public void testMultiQueryJiraPig1438() throws Exception {
// test case: merge multiple distinct jobs
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\t2\t3",
"2\t3\t4",
@@ -85,9 +86,9 @@ public class TestMultiQuery {
"2\t3\t4",
"1\t2\t3"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
-
+
myPig.setBatchOn();
myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
@@ -98,58 +99,58 @@ public class TestMultiQuery {
myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
myPig.registerQuery("store D1 into 'output1';");
- myPig.registerQuery("store D2 into 'output2';");
-
+ myPig.registerQuery("store D2 into 'output2';");
+
myPig.executeBatch();
-
- myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
+
+ myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
Iterator<Tuple> iter = myPig.openIterator("E");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(1,2)",
"(2,3)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
-
- myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
+
+ myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
iter = myPig.openIterator("E");
expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(2,3)",
"(3,4)"
});
-
+
counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
}
-
+
@Test
public void testMultiQueryJiraPig1252() throws Exception {
// test case: Problems with secondary key optimization and multiquery
// diamond optimization
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\t2\t3",
"2\t3\t4",
"3\t\t5",
"5\t6\t6",
- "6\t\t7"
+ "6\t\t7"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
@@ -161,20 +162,20 @@ public class TestMultiQuery {
myPig.registerQuery("split B into C if splitcond != '', D if splitcond == '';");
myPig.registerQuery("E = group C by splitcond;");
myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
-
+
Iterator<Tuple> iter = myPig.openIterator("F");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(1,2)",
"(2,3)",
"(3,5)",
"(5,6)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
@@ -184,22 +185,22 @@ public class TestMultiQuery {
public void testMultiQueryJiraPig1169() throws Exception {
// test case: Problems with some top N queries
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\t2\t3",
"2\t3\t4",
"3\t4\t5",
"5\t6\t7",
- "6\t7\t8"
+ "6\t7\t8"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
-
+
myPig.setBatchOn();
- myPig.registerQuery("A = load '" + INPUT_FILE
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ "' as (a:int, b, c);");
myPig.registerQuery("A1 = Order A by a desc parallel 3;");
myPig.registerQuery("A2 = limit A1 2;");
@@ -209,105 +210,105 @@ public class TestMultiQuery {
myPig.executeBatch();
myPig.registerQuery("B = load 'output2' as (a:int, b, c);");
-
+
Iterator<Tuple> iter = myPig.openIterator("B");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(6,7,8)",
"(5,6,7)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
}
-
+
@Test
public void testMultiQueryJiraPig1171() throws Exception {
// test case: Problems with some top N queries
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\tapple\t3",
"2\torange\t4",
- "3\tpersimmon\t5"
+ "3\tpersimmon\t5"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
- myPig.registerQuery("A = load '" + INPUT_FILE
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ "' as (a:long, b, c);");
myPig.registerQuery("A1 = Order A by a desc;");
myPig.registerQuery("A2 = limit A1 1;");
- myPig.registerQuery("B = load '" + INPUT_FILE
+ myPig.registerQuery("B = load '" + INPUT_FILE
+ "' as (a:long, b, c);");
myPig.registerQuery("B1 = Order B by a desc;");
myPig.registerQuery("B2 = limit B1 1;");
-
+
myPig.registerQuery("C = cross A2, B2;");
-
+
Iterator<Tuple> iter = myPig.openIterator("C");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(3L,'persimmon',5,3L,'persimmon',5)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
}
-
+
@Test
public void testMultiQueryJiraPig1157() throws Exception {
// test case: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM
-
+
String INPUT_FILE = "abc";
String INPUT_FILE_1 = "abc";
-
+
String[] inputData = {
"1\tapple\t3",
"2\torange\t4",
- "3\tpersimmon\t5"
+ "3\tpersimmon\t5"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
- myPig.registerQuery("A = load '" + INPUT_FILE
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ "' as (a:long, b, c);");
myPig.registerQuery("A1 = FOREACH A GENERATE a;");
myPig.registerQuery("B = GROUP A1 BY a;");
- myPig.registerQuery("C = load '" + INPUT_FILE_1
+ myPig.registerQuery("C = load '" + INPUT_FILE_1
+ "' as (x:long, y);");
- myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
- myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
-
+ myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
+ myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
+
Iterator<Tuple> iter = myPig.openIterator("E");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(1L,'apple',3,1L,'apple',1L,{(1L)})",
"(2L,'orange',4,2L,'orange',2L,{(2L)})",
"(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
@@ -316,7 +317,7 @@ public class TestMultiQuery {
@Test
public void testMultiQueryJiraPig1068() throws Exception {
- // test case: COGROUP fails with 'Type mismatch in key from map:
+ // test case: COGROUP fails with 'Type mismatch in key from map:
// expected org.apache.pig.impl.io.NullableText, recieved org.apache.pig.impl.io.NullableTuple'
String INPUT_FILE = "pig-1068.txt";
@@ -324,36 +325,36 @@ public class TestMultiQuery {
String[] inputData = {
"10\tapple\tlogin\tjar",
"20\torange\tlogin\tbox",
- "30\tstrawberry\tquit\tbot"
+ "30\tstrawberry\tquit\tbot"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
- myPig.registerQuery("logs = load '" + INPUT_FILE
+ myPig.registerQuery("logs = load '" + INPUT_FILE
+ "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
- myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
+ myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
+ "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
-
+
Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"('apple',{},{('apple','jar',1L)})",
"('orange',{},{('orange','box',1L)})",
"('strawberry',{(30,'strawberry','quit','bot')},{})"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
@@ -364,22 +365,22 @@ public class TestMultiQuery {
myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' "
+ myPig.registerQuery("a = load 'passwd' "
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
myPig.registerQuery("b = group plan1 by uname;");
- myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
+ myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
+ "generate flatten(group) as foo, tmp; };");
myPig.registerQuery("d = filter c BY foo is not null;");
myPig.registerQuery("store d into 'output1';");
myPig.registerQuery("store plan2 into 'output2';");
-
+
List<ExecJob> jobs = myPig.executeBatch();
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
-
+ }
+
@Test
public void testMultiQueryJiraPig1114() throws Exception {
@@ -390,9 +391,9 @@ public class TestMultiQuery {
String[] inputData = {
"10\tjar",
"20\tbox",
- "30\tbot"
+ "30\tbot"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
@@ -433,39 +434,39 @@ public class TestMultiQuery {
String INPUT_FILE_1 = "set1.txt";
String INPUT_FILE_2 = "set2.txt";
-
+
String[] inputData_1 = {
"login\t0\tjar",
"login\t1\tbox",
- "quit\t0\tmany"
+ "quit\t0\tmany"
};
-
+
Util.createLocalInputFile(INPUT_FILE_1, inputData_1);
-
+
String[] inputData_2 = {
"apple\tlogin\t{(login)}",
"orange\tlogin\t{(login)}",
- "strawberry\tquit\t{(login)}"
+ "strawberry\tquit\t{(login)}"
};
-
+
Util.createLocalInputFile(INPUT_FILE_2, inputData_2);
-
+
myPig.setBatchOn();
- myPig.registerQuery("set1 = load '" + INPUT_FILE_1
+ myPig.registerQuery("set1 = load '" + INPUT_FILE_1
+ "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
myPig.registerQuery("set2 = load '" + INPUT_FILE_2
+ "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
- myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
+ myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
+ "(chararray) 0 as f3;");
myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
- + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
+ + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
-
+
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"('quit','0','many','strawberry','quit','0')",
"('login','0','jar','apple','login','0')",
"('login','0','jar','orange','login','0')",
@@ -473,7 +474,7 @@ public class TestMultiQuery {
"('login','1','box','orange','login','1')",
"('login','1','box','strawberry','login','1')"
});
-
+
Iterator<Tuple> iter = myPig.openIterator("joined_sets");
int count = 0;
while (iter.hasNext()) {
@@ -481,11 +482,11 @@ public class TestMultiQuery {
}
assertEquals(expectedResults.size(), count);
}
-
+
@Test
public void testMultiQueryJiraPig1060_2() throws Exception {
- // test case:
+ // test case:
String INPUT_FILE = "pig-1060.txt";
@@ -495,9 +496,9 @@ public class TestMultiQuery {
"orange\t3",
"orange\t23",
"strawberry\t10",
- "strawberry\t34"
+ "strawberry\t34"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
@@ -530,7 +531,7 @@ public class TestMultiQuery {
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
+ }
@Test
public void testMultiQueryJiraPig920_2() throws Exception {
@@ -551,27 +552,27 @@ public class TestMultiQuery {
myPig.registerQuery("g = cogroup d by $0, e by $0;");
myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
myPig.registerQuery("store g1 into 'output2';");
-
+
List<ExecJob> jobs = myPig.executeBatch();
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
-
+ }
+
@Test
public void testMultiQueryJiraPig920_3() throws Exception {
// test case: execution of a simple diamond query
-
+
String INPUT_FILE = "pig-920.txt";
-
+
String[] inputData = {
"apple\tapple\t100\t10",
"apple\tapple\t200\t20",
"orange\torange\t100\t10",
- "orange\torange\t300\t20"
+ "orange\torange\t300\t20"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
@@ -582,22 +583,22 @@ public class TestMultiQuery {
myPig.registerQuery("c = filter a by gid > 10;");
myPig.registerQuery("d = cogroup c by $0, b by $0;");
myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
-
+
Iterator<Tuple> iter = myPig.openIterator("e");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"('apple',1L,2L)",
"('orange',1L,1L)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
- }
+ }
@Test
public void testMultiQueryJiraPig976() throws Exception {
@@ -625,7 +626,7 @@ public class TestMultiQuery {
@Test
public void testMultiQueryJiraPig976_2() throws Exception {
- // test case: key ('group') isn't part of foreach output
+ // test case: key ('group') isn't part of foreach output
// and keys have different types
myPig.setBatchOn();
@@ -671,7 +672,7 @@ public class TestMultiQuery {
public void testMultiQueryJiraPig976_4() throws Exception {
// test case: group by multi-cols and key ('group') isn't part of output
-
+
myPig.setBatchOn();
myPig.registerQuery("a = load 'passwd' " +
@@ -688,7 +689,7 @@ public class TestMultiQuery {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
}
-
+
@Test
public void testMultiQueryJiraPig976_5() throws Exception {
@@ -718,7 +719,7 @@ public class TestMultiQuery {
// test case: key ('group') has null values.
String INPUT_FILE = "pig-976.txt";
-
+
String[] inputData = {
"apple\tapple\t100\t10",
"apple\tapple\t\t20",
@@ -726,9 +727,9 @@ public class TestMultiQuery {
"orange\torange\t\t20",
"strawberry\tstrawberry\t300\t10"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
-
+
myPig.setBatchOn();
myPig.registerQuery("a = load '" + INPUT_FILE +
@@ -742,11 +743,11 @@ public class TestMultiQuery {
List<ExecJob> jobs = myPig.executeBatch();
assertTrue(jobs.size() == 2);
-
+
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
+ }
@Test
public void testMultiQueryJiraPig983_2() throws Exception {
@@ -766,15 +767,15 @@ public class TestMultiQuery {
myPig.registerQuery("f = group d by c::gid;");
myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
myPig.registerQuery("store f1 into 'output2';");
-
+
List<ExecJob> jobs = myPig.executeBatch();
assertTrue(jobs.size() == 2);
-
+
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
+ }
// --------------------------------------------------------------------------
// Helper methods