You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC
svn commit: r1784237 [9/22] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Feb 24 08:19:42 2017
@@ -29,9 +29,14 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -160,100 +165,178 @@ public class TezPlanContainer extends Op
return;
}
- TezOperator operToSegment = null;
- List<TezOperator> succs = new ArrayList<TezOperator>();
+ List<TezOperator> opersToSegment = null;
try {
// Split top down from root to leaves
- SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan);
+ // Get list of operators closer to the root that can be segmented together
+ FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan);
finder.visit();
- operToSegment = finder.getOperatorToSegment();
+ opersToSegment = finder.getOperatorsToSegment();
} catch (VisitorException e) {
throw new PlanException(e);
}
+ if (!opersToSegment.isEmpty()) {
+ Set<TezOperator> commonSplitterPredecessors = new HashSet<>();
+ for (TezOperator operToSegment : opersToSegment) {
+ for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) {
+ commonSplitterPredecessors
+ .addAll(getCommonSplitterPredecessors(tezOperPlan,
+ operToSegment, succ));
+ }
+ }
- if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) {
- succs.addAll(tezOperPlan.getSuccessors(operToSegment));
- for (TezOperator succ : succs) {
- tezOperPlan.disconnect(operToSegment, succ);
- }
- for (TezOperator succ : succs) {
- try {
- if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) {
- // Has already been moved to a new plan by previous successor
- // as part of dependency. It could have been further split.
- // So walk the full plan to find the new plan and connect
- TezOperatorFinder finder = new TezOperatorFinder(this, succ);
- finder.visit();
- connect(planNode, finder.getPlanContainerNode());
- continue;
+ if (commonSplitterPredecessors.isEmpty()) {
+ List<TezOperator> allSuccs = new ArrayList<TezOperator>();
+ // Disconnect all the successors and move them to a new plan
+ for (TezOperator operToSegment : opersToSegment) {
+ List<TezOperator> succs = new ArrayList<TezOperator>();
+ succs.addAll(tezOperPlan.getSuccessors(operToSegment));
+ allSuccs.addAll(succs);
+ for (TezOperator succ : succs) {
+ tezOperPlan.disconnect(operToSegment, succ);
}
- TezOperPlan newOperPlan = new TezOperPlan();
+ }
+ TezOperPlan newOperPlan = new TezOperPlan();
+ for (TezOperator succ : allSuccs) {
tezOperPlan.moveTree(succ, newOperPlan);
- TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
- generateNodeOperatorKey(), newOperPlan);
- add(newPlanNode);
- connect(planNode, newPlanNode);
- split(newPlanNode);
- if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) {
- // On further split, the successor moved to a new plan container.
- // Connect to that
- TezOperatorFinder finder = new TezOperatorFinder(this, succ);
- finder.visit();
- disconnect(planNode, newPlanNode);
- connect(planNode, finder.getPlanContainerNode());
+ }
+ TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
+ generateNodeOperatorKey(), newOperPlan);
+ add(newPlanNode);
+ connect(planNode, newPlanNode);
+ split(newPlanNode);
+ } else {
+ // If there is a common splitter predecessor between operToSegment and the successor,
+ // we have to separate out that split to be able to segment.
+ // So we store the output of split to a temp store and then change the
+ // splittees to load from it.
+ String scope = opersToSegment.get(0).getOperatorKey().getScope();
+ for (TezOperator splitter : commonSplitterPredecessors) {
+ try {
+ List<TezOperator> succs = new ArrayList<TezOperator>();
+ succs.addAll(tezOperPlan.getSuccessors(splitter));
+ FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext);
+ POStore tmpStore = getTmpStore(scope, fileSpec);
+ // Replace POValueOutputTez with POStore
+ splitter.plan.remove(splitter.plan.getLeaves().get(0));
+ splitter.plan.addAsLeaf(tmpStore);
+ splitter.segmentBelow = true;
+ splitter.setSplitter(false);
+ for (TezOperator succ : succs) {
+ // Replace POValueInputTez with POLoad
+ POLoad tmpLoad = getTmpLoad(scope, fileSpec);
+ succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad);
+ }
+ } catch (Exception e) {
+ throw new PlanException(e);
}
- } catch (VisitorException e) {
- throw new PlanException(e);
}
}
split(planNode);
}
}
- private static class SegmentOperatorFinder extends TezOpPlanVisitor {
+ private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor {
- private TezOperator operToSegment;
+ private List<TezOperator> opersToSegment = new ArrayList<>();
- public SegmentOperatorFinder(TezOperPlan plan) {
+ public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
}
- public TezOperator getOperatorToSegment() {
- return operToSegment;
+ public List<TezOperator> getOperatorsToSegment() {
+ return opersToSegment;
}
@Override
- public void visitTezOp(TezOperator tezOperator) throws VisitorException {
- if (tezOperator.needSegmentBelow() && operToSegment == null) {
- operToSegment = tezOperator;
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) {
+ if (opersToSegment.isEmpty()) {
+ opersToSegment.add(tezOp);
+ } else {
+ // If the operator does not have dependency on previous
+ // operators chosen for segmenting then add it to the
+ // operators to be segmented together
+ if (!hasPredecessor(tezOp, opersToSegment)) {
+ opersToSegment.add(tezOp);
+ }
+ }
}
}
- }
-
- private static class TezOperatorFinder extends TezPlanContainerVisitor {
+ /**
+ * Check if the tezOp has one of the opsToCheck as a predecessor.
+ * It can be a immediate predecessor or multiple levels up.
+ */
+ private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) {
+ List<TezOperator> predecessors = getPlan().getPredecessors(tezOp);
+ if (predecessors != null) {
+ for (TezOperator pred : predecessors) {
+ if (opersToSegment.contains(pred)) {
+ return true;
+ } else {
+ if (hasPredecessor(pred, opsToCheck)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
- private TezPlanContainerNode planContainerNode;
- private TezOperator operatorToFind;
+ }
- public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) {
- super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan));
- this.operatorToFind = operatorToFind;
+ private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) {
+ Set<TezOperator> splitters1 = new HashSet<>();
+ Set<TezOperator> splitters2 = new HashSet<>();
+ Set<TezOperator> processedPredecessors = new HashSet<>();
+ // Find predecessors which are splitters
+ fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
+ if (!splitters1.isEmpty()) {
+ // For the successor, traverse rest of the plan below it and
+ // search the predecessors of its successors to find any predecessor that might be a splitter.
+ Set<TezOperator> allSuccs = new HashSet<>();
+ getAllSuccessors(plan, successor, allSuccs);
+ processedPredecessors.clear();
+ processedPredecessors.add(successor);
+ for (TezOperator succ : allSuccs) {
+ fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
+ }
+ // Find the common ones
+ splitters1.retainAll(splitters2);
}
+ return splitters1;
+ }
- public TezPlanContainerNode getPlanContainerNode() {
- return planContainerNode;
+ private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
+ Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
+ List<TezOperator> predecessors = plan.getPredecessors(tezOp);
+ if (predecessors != null) {
+ for (TezOperator pred : predecessors) {
+ // Skip processing already processed predecessor to avoid loops
+ if (processedPredecessors.contains(pred)) {
+ continue;
+ }
+ if (pred.isSplitter()) {
+ splitters.add(pred);
+ } else if (!pred.needSegmentBelow()) {
+ processedPredecessors.add(pred);
+ fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
+ }
+ }
}
+ }
- @Override
- public void visitTezPlanContainerNode(
- TezPlanContainerNode tezPlanContainerNode)
- throws VisitorException {
- if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) {
- planContainerNode = tezPlanContainerNode;
+ private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) {
+ List<TezOperator> successors = plan.getSuccessors(tezOp);
+ if (successors != null) {
+ for (TezOperator succ : successors) {
+ if (!allSuccs.contains(succ)) {
+ allSuccs.add(succ);
+ getAllSuccessors(plan, succ, allSuccs);
+ }
}
}
-
}
private synchronized OperatorKey generateNodeOperatorKey() {
@@ -267,6 +350,21 @@ public class TezPlanContainer extends Op
scopeId = 0;
}
+ private POLoad getTmpLoad(String scope, FileSpec fileSpec){
+ POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ ld.setPc(pigContext);
+ ld.setIsTmpLoad(true);
+ ld.setLFile(fileSpec);
+ return ld;
+ }
+
+ private POStore getTmpStore(String scope, FileSpec fileSpec){
+ POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ st.setIsTmpStore(true);
+ st.setSFile(fileSpec);
+ return new POStoreTez(st);
+ }
+
@Override
public String toString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Feb 24 08:19:42 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -80,6 +81,9 @@ public class TezPrinter extends TezOpPla
printer.setVerbose(isVerbose);
printer.visit();
mStream.println();
+ } else if (edgeDesc.needsDistinctCombiner()) {
+ mStream.println("# Combine plan on edge <" + inEdge + ">");
+ mStream.println(DistinctCombiner.Combine.class.getName());
}
}
}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class BloomPackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean bloomCreatedInMap;
+ private int vectorSizeBytes;
+ private int numHash;
+ private int hashType;
+ private byte bloomKeyType;
+ private boolean isCombiner;
+
+ private transient ByteArrayOutputStream baos;
+ private transient Iterator<Object> distinctKeyIter;
+
+ public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+ int numHash, int hashType) {
+ super();
+ this.bloomCreatedInMap = bloomCreatedInMap;
+ this.vectorSizeBytes = vectorSizeBytes;
+ this.numHash = numHash;
+ this.hashType = hashType;
+ }
+
+ public void setBloomKeyType(byte keyType) {
+ bloomKeyType = keyType;
+ }
+
+ public void setCombiner(boolean isCombiner) {
+ this.isCombiner = isCombiner;
+ }
+
+ @Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
+ @Override
+ public Result getNext() throws ExecException {
+ try {
+ if (bloomCreatedInMap) {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ // Same function for combiner and reducer
+ return combineBloomFilters();
+ } else {
+ if (isCombiner) {
+ return getDistinctBloomKeys();
+ } else {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ return createBloomFilter();
+ }
+ }
+ } catch (IOException e) {
+ throw new ExecException("Error while constructing final bloom filter", e);
+ }
+ }
+
+ private Result combineBloomFilters() throws IOException {
+ // We get a bag of bloom filters. combine them into one
+ Iterator<Tuple> iter = bags[0].iterator();
+ Tuple tup = iter.next();
+ DataByteArray bloomBytes = (DataByteArray) tup.get(0);
+ BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+ while (iter.hasNext()) {
+ tup = iter.next();
+ bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+ }
+
+ Object partition = key;
+ detachInput(); // Free up the key and bags reference
+
+ return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
+ }
+
+ private Result createBloomFilter() throws IOException {
+ // We get a bag of keys. Create a bloom filter from them
+ // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
+ HashSet<Object> bloomKeys = new HashSet<>();
+ Iterator<Tuple> iter = bags[0].iterator();
+ while (iter.hasNext()) {
+ bloomKeys.add(iter.next().get(0));
+ }
+
+ Object partition = key;
+ detachInput(); // Free up the key and bags reference
+
+ BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ for (Object bloomKey: bloomKeys) {
+ Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
+ bloomFilter.add(k);
+ }
+ bloomKeys = null;
+ return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
+
+ }
+
+ private Result getSerializedBloomFilter(Object partition,
+ BloomFilter bloomFilter, int serializedSize) throws ExecException,
+ IOException {
+ if (baos == null) {
+ baos = new ByteArrayOutputStream(serializedSize);
+ }
+ baos.reset();
+ DataOutputStream dos = new DataOutputStream(baos);
+ bloomFilter.write(dos);
+ dos.flush();
+
+ Tuple res = mTupleFactory.newTuple(2);
+ res.set(0, partition);
+ res.set(1, new DataByteArray(baos.toByteArray()));
+
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+
+ private Result getDistinctBloomKeys() throws ExecException {
+ if (distinctKeyIter == null) {
+ HashSet<Object> bloomKeys = new HashSet<>();
+ Iterator<Tuple> iter = bags[0].iterator();
+ while (iter.hasNext()) {
+ bloomKeys.add(iter.next().get(0));
+ }
+ distinctKeyIter = bloomKeys.iterator();
+ }
+ while (distinctKeyIter.hasNext()) {
+ Tuple res = mTupleFactory.newTuple(2);
+ res.set(0, key);
+ res.set(1, distinctKeyIter.next());
+
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+ distinctKeyIter = null;
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
+ private String inputKey;
+ private transient KeyValueReader reader;
+ private transient String cacheKey;
+ private int numBloomFilters;
+ private transient BloomFilter[] bloomFilters;
+
+ public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
+ super(lr);
+ this.numBloomFilters = numBloomFilters;
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ @Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ cacheKey = "bloom-" + inputKey;
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ inputsToSkip.add(inputKey);
+ }
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ bloomFilters = (BloomFilter[]) cacheValue;
+ return;
+ }
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ while (reader.next()) {
+ if (bloomFilters == null) {
+ bloomFilters = new BloomFilter[numBloomFilters];
+ }
+ Tuple val = (Tuple) reader.getCurrentValue();
+ int index = (int) val.get(0);
+ bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+ }
+ ObjectCache.getInstance().cache(cacheKey, bloomFilters);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+
+ // If there is no bloom filter, then it means right input was empty
+ // Skip processing
+ if (bloomFilters == null) {
+ return RESULT_EOP;
+ }
+
+ while (true) {
+ res = super.getRearrangedTuple();
+ try {
+ switch (res.returnStatus) {
+ case POStatus.STATUS_OK:
+ if (illustrator == null) {
+ Tuple result = (Tuple) res.result;
+ Byte index = (Byte) result.get(0);
+
+ // Skip the record if key is not in the bloom filter
+ if (!isKeyInBloomFilter(result.get(1))) {
+ continue;
+ }
+ PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+ NullableTuple val = new NullableTuple((Tuple)result.get(2));
+ key.setIndex(index);
+ val.setIndex(index);
+ writer.write(key, val);
+ } else {
+ illustratorMarkup(res.result, res.result, 0);
+ }
+ continue;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ case POStatus.STATUS_ERR:
+ default:
+ return res;
+ }
+ } catch (IOException ioe) {
+ int errCode = 2135;
+ String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
+ throw new ExecException(msg, errCode, ioe);
+ }
+ }
+ }
+
+ private boolean isKeyInBloomFilter(Object key) throws ExecException {
+ if (key == null) {
+ // Null values are dropped in a inner join and in the case of outer join,
+ // POBloomFilterRearrangeTez is only in the plan on the non outer relation.
+ // So just skip them
+ return false;
+ }
+ if (bloomFilters.length == 1) {
+ // Skip computing hashcode
+ Key k = new Key(DataType.toBytes(key, keyType));
+ return bloomFilters[0].membershipTest(k);
+ } else {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ BloomFilter filter = bloomFilters[partition];
+ if (filter != null) {
+ Key k = new Key(DataType.toBytes(key, keyType));
+ return filter.membershipTest(k);
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
+ return (POBloomFilterRearrangeTez) super.clone();
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "BloomFilter Rearrange" + "["
+ + DataType.findTypeName(resultType) + "]" + "{"
+ + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+ }
+
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
+ * In addition, it also writes out the bloom filter constructed from the join keys
+ * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
+ *
+ * Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
+ * It also allows us to have lower false positives with smaller vector sizes.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
+
+ public static final String DEFAULT_BLOOM_STRATEGY = "map";
+ public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
+ public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
+ public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
+ public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
+
+ private String bloomOutputKey;
+ private boolean skipNullKeys = false;
+ private boolean createBloomInMap;
+ private int numBloomFilters;
+ private int vectorSizeBytes;
+ private int numHash;
+ private int hashType;
+
+ private transient BloomFilter[] bloomFilters;
+ private transient KeyValueWriter bloomWriter;
+ private transient PigNullableWritable nullKey;
+ private transient Tuple bloomValue;
+ private transient NullableTuple bloomNullableTuple;
+
+ public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
+ boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
+ int numHash, int hashType) {
+ super(lr);
+ this.createBloomInMap = createBloomInMap;
+ this.numBloomFilters = numBloomFilters;
+ this.vectorSizeBytes = vectorSizeBytes;
+ this.numHash = numHash;
+ this.hashType = hashType;
+ }
+
+ public static int getNumBloomFilters(Configuration conf) {
+ if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
+ return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
+ } else {
+ return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
+ }
+ }
+
+ public void setSkipNullKeys(boolean skipNullKeys) {
+ this.skipNullKeys = skipNullKeys;
+ }
+
+ public void setBloomOutputKey(String bloomOutputKey) {
+ this.bloomOutputKey = bloomOutputKey;
+ }
+
+ @Override
+ public boolean containsOutputKey(String key) {
+ if(super.containsOutputKey(key)) {
+ return true;
+ }
+ return bloomOutputKey.equals(key);
+ }
+
+ @Override
+ public String[] getTezOutputs() {
+ return new String[] { outputKey, bloomOutputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(outputKey)) {
+ outputKey = newOutputKey;
+ } else if (oldOutputKey.equals(bloomOutputKey)) {
+ bloomOutputKey = newOutputKey;
+ }
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ super.attachOutputs(outputs, conf);
+ LogicalOutput output = outputs.get(bloomOutputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
+ }
+ try {
+ bloomWriter = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ bloomFilters = new BloomFilter[numBloomFilters];
+ bloomValue = mTupleFactory.newTuple(1);
+ bloomNullableTuple = new NullableTuple(bloomValue);
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+
+ PigNullableWritable key;
+ while (true) {
+ res = super.getRearrangedTuple();
+ try {
+ switch (res.returnStatus) {
+ case POStatus.STATUS_OK:
+ if (illustrator == null) {
+ Tuple result = (Tuple) res.result;
+ Byte index = (Byte) result.get(0);
+
+ Object keyObj = result.get(1);
+ if (keyObj != null) {
+ key = HDataType.getWritableComparableTypes(keyObj, keyType);
+ // null keys cannot be part of bloom filter
+ // Since they are also dropped during join we can skip them
+ if (createBloomInMap) {
+ addKeyToBloomFilter(keyObj);
+ } else {
+ writeJoinKeyForBloom(keyObj);
+ }
+ } else if (skipNullKeys) {
+ // Inner join. So don't bother writing null key
+ continue;
+ } else {
+ if (nullKey == null) {
+ nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
+ }
+ key = nullKey;
+ }
+
+ NullableTuple val = new NullableTuple((Tuple)result.get(2));
+ key.setIndex(index);
+ val.setIndex(index);
+ writer.write(key, val);
+ } else {
+ illustratorMarkup(res.result, res.result, 0);
+ }
+ continue;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ if (this.parentPlan.endOfAllInput && createBloomInMap) {
+ // In case of Split will get EOP after every record.
+ // So check for endOfAllInput
+ writeBloomFilters();
+ }
+ case POStatus.STATUS_ERR:
+ default:
+ return res;
+ }
+ } catch (IOException ioe) {
+ int errCode = 2135;
+ String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
+ throw new ExecException(msg, errCode, ioe);
+ }
+ }
+ }
+
+ private void addKeyToBloomFilter(Object key) throws ExecException {
+ Key k = new Key(DataType.toBytes(key, keyType));
+ if (bloomFilters.length == 1) {
+ if (bloomFilters[0] == null) {
+ bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ }
+ bloomFilters[0].add(k);
+ } else {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ BloomFilter filter = bloomFilters[partition];
+ if (filter == null) {
+ filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ bloomFilters[partition] = filter;
+ }
+ filter.add(k);
+ }
+ }
+
+ private void writeJoinKeyForBloom(Object key) throws IOException {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ bloomValue.set(0, key);
+ bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+ }
+
+ private void writeBloomFilters() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
+ for (int i = 0; i < bloomFilters.length; i++) {
+ if (bloomFilters[i] != null) {
+ DataOutputStream dos = new DataOutputStream(baos);
+ bloomFilters[i].write(dos);
+ dos.flush();
+ bloomValue.set(0, new DataByteArray(baos.toByteArray()));
+ bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+ baos.reset();
+ }
+ }
+ }
+
+ @Override
+ public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
+ return (POBuildBloomRearrangeTez) super.clone();
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "BuildBloom Rearrange" + "["
+ + DataType.findTypeName(resultType) + "]" + "{"
+ + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
+ }
+
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Feb 24 08:19:42 2017
@@ -56,6 +56,7 @@ public class POCounterStatsTez extends P
private transient KeyValuesReader reader;
private transient KeyValueWriter writer;
private transient boolean finished = false;
+ private transient boolean hasNext = false;
public POCounterStatsTez(OperatorKey k) {
super(k);
@@ -88,6 +89,7 @@ public class POCounterStatsTez extends P
try {
reader = (KeyValuesReader) input.getReader();
LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ hasNext = reader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -130,12 +132,13 @@ public class POCounterStatsTez extends P
Integer key = null;
Long value = null;
// Read count of records per task
- while (reader.next()) {
+ while (hasNext) {
key = ((IntWritable)reader.getCurrentKey()).get();
for (Object val : reader.getCurrentValues()) {
value = ((LongWritable)val).get();
counterRecords.put(key, value);
}
+ hasNext = reader.next();
}
// BinInterSedes only takes String for map key
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Feb 24 08:19:42 2017
@@ -19,6 +19,8 @@
package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -101,9 +103,13 @@ public class POFRJoinTez extends POFRJoi
LogicalInput input = inputs.get(key);
if (!this.replInputs.contains(input)) {
this.replInputs.add(input);
- this.replReaders.add((KeyValueReader) input.getReader());
+ KeyValueReader reader = (KeyValueReader) input.getReader();
+ this.replReaders.add(reader);
+ log.info("Attached input from vertex " + key + " : input=" + input + ", reader=" + reader);
}
}
+ // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4 have
+ // multiple POFRJoinTez loading same replicate input and will skip records
} catch (Exception e) {
throw new ExecException(e);
}
@@ -114,6 +120,7 @@ public class POFRJoinTez extends POFRJoi
*
* @throws ExecException
*/
+ @SuppressWarnings("unchecked")
@Override
protected void setUpHashMap() throws ExecException {
@@ -121,8 +128,8 @@ public class POFRJoinTez extends POFRJoi
// where same POFRJoinTez occurs in different Split sub-plans
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
if (cacheValue != null) {
- replicates = (TupleToMapKey[]) cacheValue;
- log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
+ replicates = (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue;
+ log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
return;
}
@@ -148,7 +155,7 @@ public class POFRJoinTez extends POFRJoi
long time1 = System.currentTimeMillis();
- replicates[fragment] = null;
+ replicates.set(fragment, null);
int inputIdx = 0;
// We need to adjust the index because the number of replInputs is
// one less than the number of inputSchemas. The inputSchemas
@@ -158,7 +165,12 @@ public class POFRJoinTez extends POFRJoi
SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx];
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx];
- TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
+ Map<Object, ArrayList<Tuple>> replicate;
+ if (keySchemaTupleFactory == null) {
+ replicate = new HashMap<Object, ArrayList<Tuple>>(4000);
+ } else {
+ replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
+ }
POLocalRearrange lr = LRs[schemaIdx];
try {
@@ -168,7 +180,8 @@ public class POFRJoinTez extends POFRJoi
}
PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey();
- if (isKeyNull(key.getValueAsPigType())) continue;
+ Object keyValue = key.getValueAsPigType();
+ if (isKeyNull(keyValue)) continue;
NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue();
// POFRJoin#getValueTuple() is reused to construct valTuple,
@@ -176,27 +189,31 @@ public class POFRJoinTez extends POFRJoi
// construct one here.
Tuple retTuple = mTupleFactory.newTuple(3);
retTuple.set(0, key.getIndex());
- retTuple.set(1, key.getValueAsPigType());
+ retTuple.set(1, keyValue);
retTuple.set(2, val.getValueAsPigType());
Tuple valTuple = getValueTuple(lr, retTuple);
- Tuple keyTuple = mTupleFactory.newTuple(1);
- keyTuple.set(0, key.getValueAsPigType());
- if (replicate.get(keyTuple) == null) {
- replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+ ArrayList<Tuple> values = replicate.get(keyValue);
+ if (values == null) {
+ if (inputSchemaTupleFactory == null) {
+ values = new ArrayList<Tuple>(1);
+ } else {
+ values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
+ }
+ replicate.put(keyValue, values);
}
- replicate.get(keyTuple).add(valTuple);
+ values.add(valTuple);
}
} catch (IOException e) {
throw new ExecException(e);
}
- replicates[schemaIdx] = replicate;
+ replicates.set(schemaIdx, replicate);
inputIdx++;
schemaIdx++;
}
long time2 = System.currentTimeMillis();
- log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
+ log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1));
ObjectCache.getInstance().cache(cacheKey, replicates);
log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Feb 24 08:19:42 2017
@@ -57,6 +57,7 @@ public class POIdentityInOutTez extends
private transient KeyValuesReader shuffleReader;
private transient boolean shuffleInput;
private transient boolean finished = false;
+ private transient boolean hasNext = false;
public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) {
super(inputRearrange);
@@ -95,9 +96,12 @@ public class POIdentityInOutTez extends
Reader r = input.getReader();
if (r instanceof KeyValueReader) {
reader = (KeyValueReader) r;
+ // Force input fetch
+ hasNext = reader.next();
} else {
shuffleInput = true;
shuffleReader = (KeyValuesReader) r;
+ hasNext = shuffleReader.next();
}
LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r);
} catch (Exception e) {
@@ -127,7 +131,7 @@ public class POIdentityInOutTez extends
return RESULT_EOP;
}
if (shuffleInput) {
- while (shuffleReader.next()) {
+ while (hasNext) {
Object curKey = shuffleReader.getCurrentKey();
Iterable<Object> vals = shuffleReader.getCurrentValues();
if (isSkewedJoin) {
@@ -139,9 +143,10 @@ public class POIdentityInOutTez extends
for (Object val : vals) {
writer.write(curKey, val);
}
+ hasNext = shuffleReader.next();
}
} else {
- while (reader.next()) {
+ while (hasNext) {
if (isSkewedJoin) {
NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
(PigNullableWritable) reader.getCurrentKey());
@@ -155,6 +160,7 @@ public class POIdentityInOutTez extends
writer.write(reader.getCurrentKey(),
reader.getCurrentValue());
}
+ hasNext = reader.next();
}
}
finished = true;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Feb 24 08:19:42 2017
@@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends
}
}
- public String getOutputKey() {
- return outputKey;
+ public boolean containsOutputKey(String key) {
+ return outputKey.equals(key);
}
public void setOutputKey(String outputKey) {
@@ -122,6 +122,10 @@ public class POLocalRearrangeTez extends
}
}
+ protected Result getRearrangedTuple() throws ExecException {
+ return super.getNextTuple();
+ }
+
@Override
public Result getNextTuple() throws ExecException {
res = super.getNextTuple();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Fri Feb 24 08:19:42 2017
@@ -51,6 +51,7 @@ public class PORankTez extends PORank im
private transient Map<Integer, Long> counterOffsets;
private transient Configuration conf;
private transient boolean finished = false;
+ private transient Boolean hasFirstRecord;
public PORankTez(PORank copy) {
super(copy);
@@ -100,6 +101,7 @@ public class PORankTez extends PORank im
try {
reader = (KeyValueReader) input.getReader();
LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
+ hasFirstRecord = reader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -140,9 +142,18 @@ public class PORankTez extends PORank im
Result inp = null;
try {
- while (reader.next()) {
- inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
- return addRank(inp);
+ if (hasFirstRecord != null) {
+ if (hasFirstRecord) {
+ hasFirstRecord = null;
+ inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+ return addRank(inp);
+ }
+ hasFirstRecord = null;
+ } else {
+ while (reader.next()) {
+ inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+ return addRank(inp);
+ }
}
} catch (IOException e) {
throw new ExecException(e);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Fri Feb 24 08:19:42 2017
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.backend.executionengine.ExecException;
@@ -32,12 +34,16 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +54,7 @@ import org.apache.tez.runtime.library.co
public class POShuffleTezLoad extends POPackage implements TezInput {
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
protected List<String> inputKeys = new ArrayList<String>();
private boolean isSkewedJoin = false;
@@ -61,6 +68,7 @@ public class POShuffleTezLoad extends PO
private transient WritableComparator groupingComparator = null;
private transient Configuration conf;
private transient int accumulativeBatchSize;
+ private transient boolean readOnceOneBag;
public POShuffleTezLoad(POPackage pack) {
super(pack);
@@ -101,7 +109,10 @@ public class POShuffleTezLoad extends PO
// - Input key will be repeated, but index would be same within a TezInput
if (!this.inputs.contains(input)) {
this.inputs.add(input);
- this.readers.add((KeyValuesReader)input.getReader());
+ KeyValuesReader reader = (KeyValuesReader)input.getReader();
+ this.readers.add(reader);
+ LOG.info("Attached input from vertex " + inputKey
+ + " : input=" + input + ", reader=" + reader);
}
}
@@ -117,6 +128,13 @@ public class POShuffleTezLoad extends PO
for (int i = 0; i < numTezInputs; i++) {
finished[i] = !readers.get(i).next();
}
+
+ this.readOnceOneBag = (numInputs == 1)
+ && (pkgr instanceof CombinerPackager
+ || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
+ if (readOnceOneBag) {
+ readOnce[0] = true;
+ }
} catch (Exception e) {
throw new ExecException(e);
}
@@ -187,43 +205,47 @@ public class POShuffleTezLoad extends PO
} else {
- for (int i = 0; i < numInputs; i++) {
- bags[i] = new InternalCachedBag(numInputs);
- }
-
- if (numTezInputs == 1) {
- do {
- Iterable<Object> vals = readers.get(0).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[0] = !readers.get(0).next();
- if (finished[0]) {
- break;
- }
- cur = readers.get(0).getCurrentKey();
- } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ if (readOnceOneBag) {
+ bags[0] = new TezReadOnceBag(pkgr, min);
} else {
- for (int i = 0; i < numTezInputs; i++) {
- if (!finished[i]) {
- cur = readers.get(i).getCurrentKey();
- // We need to loop in case of Grouping Comparators
- while (groupingComparator.compare(min, cur) == 0) {
- Iterable<Object> vals = readers.get(i).getCurrentValues();
- for (Object val : vals) {
- NullableTuple nTup = (NullableTuple) val;
- int index = nTup.getIndex();
- Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
- bags[index].add(tup);
- }
- finished[i] = !readers.get(i).next();
- if (finished[i]) {
- break;
- }
+ for (int i = 0; i < numInputs; i++) {
+ bags[i] = new InternalCachedBag(numInputs);
+ }
+
+ if (numTezInputs == 1) {
+ do {
+ Iterable<Object> vals = readers.get(0).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ break;
+ }
+ cur = readers.get(0).getCurrentKey();
+ } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators
+ } else {
+ for (int i = 0; i < numTezInputs; i++) {
+ if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
+ // We need to loop in case of Grouping Comparators
+ while (groupingComparator.compare(min, cur) == 0) {
+ Iterable<Object> vals = readers.get(i).getCurrentValues();
+ for (Object val : vals) {
+ NullableTuple nTup = (NullableTuple) val;
+ int index = nTup.getIndex();
+ Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index);
+ bags[index].add(tup);
+ }
+ finished[i] = !readers.get(i).next();
+ if (finished[i]) {
+ break;
+ }
+ cur = readers.get(i).getCurrentKey();
+ }
}
}
}
@@ -383,4 +405,74 @@ public class POShuffleTezLoad extends PO
}
+ private class TezReadOnceBag extends ReadOnceBag {
+
+ private static final long serialVersionUID = 1L;
+ private Iterator<Object> iter;
+
+ public TezReadOnceBag(Packager pkgr,
+ PigNullableWritable currentKey) throws IOException {
+ this.pkgr = pkgr;
+ this.keyWritable = currentKey;
+ this.iter = readers.get(0).getCurrentValues().iterator();
+ }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new TezReadOnceBagIterator();
+ }
+
+ private class TezReadOnceBagIterator implements Iterator<Tuple> {
+
+ @Override
+ public boolean hasNext() {
+ if (iter.hasNext()) {
+ return true;
+ } else {
+ try {
+ finished[0] = !readers.get(0).next();
+ if (finished[0]) {
+ return false;
+ }
+ // Currently combiner is not being applied when secondary key(grouping comparator) is used
+ // But might change in future. So check if the next key is same and return its values
+ Object cur = readers.get(0).getCurrentKey();
+ if (groupingComparator.compare(keyWritable, cur) == 0) {
+ iter = readers.get(0).getCurrentValues().iterator();
+ // Key should at least have one value. But doing a check just for safety
+ if (iter.hasNext()) {
+ return true;
+ } else {
+ throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values");
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+ }
+ }
+ }
+
+ @Override
+ public Tuple next() {
+ NullableTuple ntup = (NullableTuple) iter.next();
+ int index = ntup.getIndex();
+ Tuple ret = null;
+ try {
+ ret = pkgr.getValueTuple(keyWritable, ntup, index);
+ } catch (ExecException e) {
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+ }
+ }
+
+ }
+
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Fri Feb 24 08:19:42 2017
@@ -57,6 +57,7 @@ public class POShuffledValueInputTez ext
private transient Iterator<KeyValueReader> readers;
private transient KeyValueReader currentReader;
private transient Configuration conf;
+ private transient Boolean hasFirstRecord;
public POShuffledValueInputTez(OperatorKey k) {
super(k);
@@ -98,6 +99,8 @@ public class POShuffledValueInputTez ext
}
readers = readersList.iterator();
currentReader = readers.next();
+ // Force input fetch
+ hasFirstRecord = currentReader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -111,7 +114,15 @@ public class POShuffledValueInputTez ext
}
do {
- if (currentReader.next()) {
+ if (hasFirstRecord != null) {
+ if (hasFirstRecord) {
+ hasFirstRecord = null;
+ Tuple origTuple = (Tuple) currentReader.getCurrentValue();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ hasFirstRecord = null;
+ } else if (currentReader.next()) {
Tuple origTuple = (Tuple) currentReader.getCurrentValue();
Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
return new Result(POStatus.STATUS_OK, copy);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Fri Feb 24 08:19:42 2017
@@ -60,6 +60,8 @@ public class POSimpleTezLoad extends POL
private transient Configuration conf;
private transient boolean finished = false;
private transient TezCounter inputRecordCounter;
+ private transient boolean initialized;
+ private transient boolean noTupleCopy;
public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
super(k, loader);
@@ -149,7 +151,13 @@ public class POSimpleTezLoad extends POL
} else {
Result res = new Result();
Tuple next = (Tuple) reader.getCurrentValue();
- res.result = next;
+ if (!initialized) {
+ noTupleCopy = mTupleFactory.newTuple(1).getClass().isInstance(next);
+ initialized = true;
+ }
+ // Some Loaders return implementations of DefaultTuple instead of BinSedesTuple
+ // In that case copy to BinSedesTuple
+ res.result = noTupleCopy ? next : mTupleFactory.newTupleNoCopy(next.getAll());
res.returnStatus = POStatus.STATUS_OK;
if (inputRecordCounter != null) {
inputRecordCounter.increment(1);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Fri Feb 24 08:19:42 2017
@@ -102,19 +102,19 @@ public class POStoreTez extends POStore
throw new ExecException(e);
}
- // Multiple outputs - can be another store or other outputs (shuffle, broadcast)
- if (outputs.size() > 1) {
- CounterGroup multiStoreGroup = processorContext.getCounters()
- .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- if (multiStoreGroup == null) {
- processorContext.getCounters().addGroup(
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- }
- String name = MRPigStatsUtil.getMultiStoreCounterName(this);
- if (name != null) {
- outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
- }
+ // Even if there is a single hdfs output, we add multi store counter
+ // Makes it easier for user to see records for a particular store from
+ // the DAG counter
+ CounterGroup multiStoreGroup = processorContext.getCounters()
+ .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ if (multiStoreGroup == null) {
+ processorContext.getCounters().addGroup(
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ }
+ String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+ if (name != null) {
+ outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Fri Feb 24 08:19:42 2017
@@ -57,6 +57,7 @@ public class POValueInputTez extends Phy
private transient KeyValuesReader shuffleReader;
private transient boolean shuffleInput;
private transient boolean hasNext;
+ private transient Boolean hasFirstRecord;
public POValueInputTez(OperatorKey k) {
super(k);
@@ -92,6 +93,8 @@ public class POValueInputTez extends Phy
Reader r = input.getReader();
if (r instanceof KeyValueReader) {
reader = (KeyValueReader) r;
+ // Force input fetch
+ hasFirstRecord = reader.next();
} else {
shuffleInput = true;
shuffleReader = (KeyValuesReader) r;
@@ -118,10 +121,22 @@ public class POValueInputTez extends Phy
}
hasNext = shuffleReader.next();
}
- } else if (reader.next()) {
- Tuple origTuple = (Tuple) reader.getCurrentValue();
- Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
- return new Result(POStatus.STATUS_OK, copy);
+ } else {
+ if (hasFirstRecord != null) {
+ if (hasFirstRecord) {
+ hasFirstRecord = null;
+ Tuple origTuple = (Tuple) reader.getCurrentValue();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ hasFirstRecord = null;
+ } else {
+ while (reader.next()) {
+ Tuple origTuple = (Tuple) reader.getCurrentValue();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ }
}
finished = true;
// For certain operators (such as STREAM), we could still have some work
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Feb 24 08:19:42 2017
@@ -69,6 +69,11 @@ public class CombinerOptimizer extends T
}
for (TezOperator from : predecessors) {
+ PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
+ if (!combinePlan.isEmpty()) {
+ // Cases like bloom join have combine plan already set
+ continue;
+ }
List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
if (rearranges.isEmpty()) {
continue;
@@ -77,7 +82,7 @@ public class CombinerOptimizer extends T
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
@@ -90,7 +95,6 @@ public class CombinerOptimizer extends T
// Detected the POLocalRearrange -> POPackage pattern. Let's add
// combiner if possible.
- PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
if(!combinePlan.isEmpty()) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Feb 24 08:19:42 2017
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -65,11 +66,6 @@ public class LoaderProcessor extends Tez
this.jobConf.setBoolean("mapred.mapper.new-api", true);
this.jobConf.setClass("mapreduce.inputformat.class",
PigInputFormat.class, InputFormat.class);
- try {
- this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
- } catch (IOException e) {
- throw new VisitorException(e);
- }
}
/**
@@ -175,6 +171,7 @@ public class LoaderProcessor extends Tez
// splits can be moved to if(loads) block below
int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
tezOp.setRequestedParallelism(parallelism);
+ tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
}
return lds;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Feb 24 08:19:42 2017
@@ -153,6 +153,8 @@ public class MultiQueryOptimizerTez exte
}
}
if (getPlan().getSuccessors(successor) != null) {
+ nonPackageInputSuccessors.clear();
+ toMergeSuccessors.clear();
for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
if (succSuccessor.isUnion()) {
if (!(unionOptimizerOn &&
@@ -171,7 +173,13 @@ public class MultiQueryOptimizerTez exte
continue;
}
}
- toMergeSuccessors.add(succSuccessor);
+ if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
+ // Output goes to scalar or POFRJoinTez in the union operator
+ // We need to ensure it is the only one to avoid parallel edges
+ canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
+ } else {
+ toMergeSuccessors.add(succSuccessor);
+ }
List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
if (unionSuccessors != null) {
for (TezOperator unionSuccessor : unionSuccessors) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Feb 24 08:19:42 2017
@@ -115,11 +115,16 @@ public class ParallelismSetter extends T
} else if (pc.defaultParallel != -1) {
parallelism = pc.defaultParallel;
}
+ if (parallelism == 0) {
+ // We need to produce empty output file.
+ // Even if user set PARALLEL 0, mapreduce has 1 reducer
+ parallelism = 1;
+ }
boolean overrideRequestedParallelism = false;
if (parallelism != -1
&& autoParallelismEnabled
- && tezOp.isIntermediateReducer()
&& !tezOp.isDontEstimateParallelism()
+ && tezOp.isIntermediateReducer()
&& tezOp.isOverrideIntermediateParallelism()) {
overrideRequestedParallelism = true;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Fri Feb 24 08:19:42 2017
@@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Fri Feb 24 08:19:42 2017
@@ -30,6 +30,8 @@ public class TezEstimatedParallelismClea
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
- tezOp.setEstimatedParallelism(-1);
+ if (!tezOp.isDontEstimateParallelism()) {
+ tezOp.setEstimatedParallelism(-1);
+ }
}
}