You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/26 17:40:36 UTC
svn commit: r1780431 [1/2] - in /pig/trunk: ./
src/docs/src/documentation/content/xdocs/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/o...
Author: rohini
Date: Thu Jan 26 17:40:35 2017
New Revision: 1780431
URL: http://svn.apache.org/viewvc?rev=1780431&view=rev
Log:
PIG-4963: Add a Bloom join (rohini)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
pig/trunk/test/e2e/pig/tests/join.conf
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
pig/trunk/test/e2e/pig/build.xml
pig/trunk/test/e2e/pig/tests/multiquery.conf
pig/trunk/test/e2e/pig/tests/orc.conf
pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 26 17:40:35 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
IMPROVEMENTS
+PIG-4963: Add a Bloom join (rohini)
+
PIG-3938: Add LoadCaster to EvalFunc (knoguchi)
PIG-5105: Tez unit tests failing with "Argument list too long" (rohini)
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Thu Jan 26 17:40:35 2017
@@ -6955,7 +6955,7 @@ public class SimpleCustomPartitioner ext
<table>
<tr>
<td>
- <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n]; </p>
+ <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'bloom' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n]; </p>
</td>
</tr>
</table></section>
@@ -7004,7 +7004,16 @@ public class SimpleCustomPartitioner ext
<p>Use to perform replicated joins (see <a href="perf.html#replicated-joins">Replicated Joins</a>).</p>
</td>
</tr>
-
+
+ <tr>
+ <td>
+ <p>'bloom'</p>
+ </td>
+ <td>
+ <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+ </td>
+ </tr>
+
<tr>
<td>
<p>'skewed'</p>
@@ -7142,7 +7151,7 @@ DUMP X;
<tr>
<td>
<p>alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column
- [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; </p>
+ [USING 'replicated' | 'bloom' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; </p>
</td>
</tr>
</table>
@@ -7213,7 +7222,7 @@ DUMP X;
</td>
</tr>
- <tr>
+ <tr>
<td>
<p>USING</p>
</td>
@@ -7230,8 +7239,18 @@ DUMP X;
<p>Only left outer join is supported for replicated joins.</p>
</td>
</tr>
-
- <tr>
+
+ <tr>
+ <td>
+ <p>'bloom'</p>
+ </td>
+ <td>
+ <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+ <p>Full outer join is not supported for bloom joins.</p>
+ </td>
+ </tr>
+
+ <tr>
<td>
<p>'skewed'</p>
</td>
@@ -7324,6 +7343,13 @@ B = LOAD 'tiny';
C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';
</source>
+<p>This example shows a bloom right outer join.</p>
+<source>
+A = LOAD 'large';
+B = LOAD 'small';
+C= JOIN A BY $0 RIGHT, B BY $0 USING 'bloom';
+</source>
+
<p>This example shows a skewed full outer join.</p>
<source>
A = LOAD 'studenttab' as (name, age, gpa);
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Thu Jan 26 17:40:35 2017
@@ -1202,6 +1202,100 @@ gets 1 GB of memory. Please share your o
</section>
<!-- END FRAGMENT REPLICATE JOINS-->
+<!-- BLOOM JOINS-->
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="bloom-joins">
+<title>Bloom Joins</title>
+<p>Bloom join is a special type of join where a bloom filter is constructed using join keys of one relation and
+used to filter records of the other relations before doing a regular hash join.
+The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side.
+Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less
+compared to the total records allowing many to be filtered before the join.
+Before bloom join was added as a type of join, same functionality was achieved by users by using
+the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient and required more lines of code as well.
+Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.</p>
+
+<section>
+<title>Usage</title>
+<p>Perform a bloom join with the USING clause (see <a href="basic.html#join-inner">JOIN (inner)</a> and <a href="basic.html#join-outer">JOIN (outer)</a>).
+In this example, a large relation is joined with two smaller relations. Note that the large relation comes first followed by the smaller relations.
+Bloom filter is built from join keys of the right most relation which is small and the filter is applied on the big and medium relations.
+None of the relations are required to fit into main memory. </p>
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+medium = LOAD 'medium_data' AS (m1,m2,m3);
+
+small = LOAD 'small_data' AS (s1,s2,s3);
+
+C = JOIN big BY b1, medium BY m1, small BY s1 USING 'bloom';
+</source>
+
+<p>
+In the case of inner join and right outer join, the right most relation is used for building the bloom filter
+and the users are expected to specify the smaller dataset as the right most relation.
+But in the case of left outer join, the left most relation is used for building the bloom filter and is expected to be the smaller dataset.
+This is because all records of the outer relation should be in the result and no records can be filtered.
+If the left relation turns out to be the bigger dataset, it would not be as efficient to build the bloom filter on the bigger dataset.
+But it might still perform better than a regular join if it is able to filter lot of records from the right relation.
+</p>
+
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+small = LOAD 'small_data' AS (m1,m2,m3);
+
+C = JOIN small BY s1 LEFT, big BY b1 USING 'bloom';
+</source>
+</section>
+
+<section>
+<title>Conditions</title>
+<ul>
+<li>Bloom join cannot be used with a FULL OUTER join.</li>
+<li>If the the underlying data is sufficiently skewed, bloom join might not help. Skewed join can be considered for those cases.</li>
+</ul>
+</section>
+
+<section>
+<title>Tuning options</title>
+<p>
+There are multiple <a href="start.html#properties">pig properties</a> than can be configured to construct a more efficient bloom filter.
+See <a href="http://en.wikipedia.org/wiki/Bloom_filter">Bloom Filter</a> for a discussion of how to select the number of bits and the number of hash functions.
+Easier option would be to search for "bloom filter calculator" in a search engine and use one of the online bloom filter calculators available to arrive at the desired values.
+</p>
+<ul>
+<li>pig.bloomjoin.strategy - The valid values for this are 'map' and 'reduce'. Default value is map.
+Bloom join has two different kind of implementations to be more efficient in different cases.
+In general, there is an extra reduce step in the DAG for construction of the bloom filter(s).
+<ul>
+<li>map - In each map, bloom filters are computed on the join keys partitioned by the hashcode of the key
+with pig.bloomjoin.num.filters number of partitions.
+Bloom filters for each partition from different maps are then combined in the reducers producing one bloom filter per partition.
+The default value of pig.bloomjoin.num.filters is 1 for this strategy and so usually only one bloom filter is created.
+This is efficient and fast if there are smaller number of maps (<10) and the number of distinct keys are not too high.
+It can be faster with larger number of maps and even with bigger bloom vector sizes,
+ but the amount of data shuffled to the reducer for aggregation becomes huge making it inefficient.</li>
+<li>reduce - Join keys are sent from the map to the reducer partitioned by hashcode of the key with
+pig.bloomjoin.num.filters number of partitions. In the reducers, one bloom filter is then computed per partition.
+Number of reducers are set equal to the number of partitions allowing for each bloom filter to be computed in parallel.
+The default value of pig.bloomjoin.num.filters is 11 for this strategy.
+This is efficient for larger datasets with lot of maps or very large bloom vector size.
+In this case size of keys sent to the reducer is smaller than sending bloom filters to reducer for aggregation making it efficient.</li>
+</ul>
+</li>
+<li>pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.</li>
+<li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter.
+A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).</li>
+<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li>
+<li>pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.</li>
+</ul>
+</section>
+
+</section>
+<!-- END BLOOM JOINS-->
+
<!-- +++++++++++++++++++++++++++++++ -->
<!-- SKEWED JOINS-->
<section id="skewed-joins">
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Jan 26 17:40:35 2017
@@ -134,6 +134,58 @@ public class PigConfiguration {
public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
/**
+ * Bloom join has two different kind of implementations.
+ * <ul>
+ * <li>map <br>
+ * In each map, bloom filters are computed on the join keys partitioned by
+ * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of
+ * partitions. Bloom filters from different maps are then combined in the
+ * reducer producing one bloom filter per partition. This is efficient and
+ * fast if there are smaller number of maps (<10) and the number of
+ * distinct keys are not too high. It can be faster with larger number of
+ * maps and even with bigger bloom vector sizes, but the amount of data
+ * shuffled to the reducer for aggregation becomes huge making it
+ * inefficient.</li>
+ * <li>reduce <br>
+ * Join keys are sent from the map to the reducer partitioned by hashcode of
+ * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One
+ * bloom filter is then created per partition. This is efficient for larger
+ * datasets with lot of maps or very large
+ * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent
+ * to the reducer is smaller than sending bloom filters to reducer for
+ * aggregation making it efficient.</li>
+ * </ul>
+ * Default value is map.
+ */
+ public static final String PIG_BLOOMJOIN_STRATEGY = "pig.bloomjoin.strategy";
+
+ /**
+ * The number of bloom filters that will be created.
+ * Default is 1 for map strategy and 11 for reduce strategy.
+ */
+ public static final String PIG_BLOOMJOIN_NUM_FILTERS = "pig.bloomjoin.num.filters";
+
+ /**
+ * The size in bytes of the bit vector to be used for the bloom filter.
+ * A bigger vector size will be needed when the number of distinct keys is higher.
+ * Default value is 1048576 (1MB).
+ */
+ public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = "pig.bloomjoin.vectorsize.bytes";
+
+ /**
+ * The type of hash function to use. Valid values are jenkins and murmur.
+ * Default is murmur.
+ */
+ public static final String PIG_BLOOMJOIN_HASH_TYPE = "pig.bloomjoin.hash.type";
+
+ /**
+ * The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+ * Higher the number lower the false positives. Too high a value can increase the cpu time.
+ * Default value is 3.
+ */
+ public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions";
+
+ /**
* This key used to control the maximum size loaded into
* the distributed cache when doing fragment-replicated join
*/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Jan 26 17:40:35 2017
@@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV
try{
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
- if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN
+ || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+ // Bloom join is not implemented in mapreduce mode and falls back to regular join
curMROp.markRegularJoin();
} else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Jan 26 17:40:35 2017
@@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -160,7 +161,7 @@ public class PigCombiner {
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack.getPkgr() instanceof JoinPackager)
+ if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Jan 26 17:40:35 2017
@@ -21,14 +21,16 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -105,7 +107,7 @@ public class EndOfAllInputSetter extends
public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
endOfAllInputFlag = true;
}
-
+
@Override
public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
endOfAllInputFlag = true;
@@ -122,6 +124,13 @@ public class EndOfAllInputSetter extends
}
}
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+ if (lr instanceof POBuildBloomRearrangeTez) {
+ endOfAllInputFlag = true;
+ }
+ super.visitLocalRearrange(lr);
+ }
/**
* @return if end of all input is present
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Jan 26 17:40:35 2017
@@ -441,6 +441,10 @@ public abstract class PhysicalOperator e
public void reset() {
}
+ public boolean isEndOfAllInput() {
+ return parentPlan.endOfAllInput;
+ }
+
/**
* @return PigProgressable stored in threadlocal
*/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Jan 26 17:40:35 2017
@@ -51,13 +51,13 @@ public class Packager implements Illustr
protected DataBag[] bags;
public static enum PackageType {
- GROUP, JOIN
+ GROUP, JOIN, BLOOMJOIN
};
protected transient Illustrator illustrator = null;
// The key being worked on
- Object key;
+ protected Object key;
// marker to indicate if key is a tuple
protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@ public class Packager implements Illustr
protected boolean isKeyCompound = false;
// key's type
- byte keyType;
+ protected byte keyType;
// The number of inputs to this
// co-group. 0 indicates a distinct, which means there will only be a
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan 26 17:40:35 2017
@@ -479,7 +479,7 @@ public class TezDagBuilder extends TezOp
POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
byte keyType = lr.getKeyType();
setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
// In case of secondary key sort, main key type is the actual key type
@@ -540,26 +540,36 @@ public class TezDagBuilder extends TezOp
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
out.setUserPayload(payLoad);
+ in.setUserPayload(payLoad);
+ // Remove combiner and reset payload
if (!combinePlan.isEmpty()) {
boolean noCombineInReducer = false;
+ boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
- if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+ if (edge.getCombinerInReducer() != null) {
+ noCombineInReducer = !edge.getCombinerInReducer();
+ } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
} else {
noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
}
- if (noCombineInReducer) {
+ if (noCombineInReducer || noCombineInMapper) {
log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
conf.unset("pig.combinePlan");
conf.unset("pig.combine.package");
conf.unset("pig.map.keytype");
- payLoad = TezUtils.createUserPayloadFromConf(conf);
+ UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
+ if (noCombineInMapper) {
+ out.setUserPayload(payLoadWithoutCombiner);
+ }
+ if (noCombineInReducer) {
+ in.setUserPayload(payLoadWithoutCombiner);
+ }
}
}
- in.setUserPayload(payLoad);
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
@@ -717,7 +727,7 @@ public class TezDagBuilder extends TezOp
PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
for (POLocalRearrangeTez lr : lrs) {
if (lr.isConnectedToPackage()
- && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+ && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
localRearrangeMap.put((int) lr.getIndex(), inputKey);
if (isVertexGroup) {
isMergedInput = true;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Thu Jan 26 17:40:35 2017
@@ -32,10 +32,12 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.hash.Hash;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
@@ -44,8 +46,10 @@ import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -82,7 +86,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -110,6 +117,7 @@ import org.apache.pig.impl.builtin.GetMe
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
@@ -167,6 +175,10 @@ public class TezCompiler extends PhyPlan
private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
+ // Contains the inputs to operator like join, with the list maintaining the
+ // same order of join from left to right
+ private Map<TezOperator, List<TezOperator>> inputsMap;
+
public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -175,6 +187,8 @@ public class TezCompiler extends PhyPlan
private boolean optimisticFileConcatenation = false;
private List<String> readOnceLoadFuncs = null;
+ private Configuration conf;
+
private POLocalRearrangeTezFactory localRearrangeFactory;
public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -184,6 +198,7 @@ public class TezCompiler extends PhyPlan
this.pigContext = pigContext;
pigProperties = pigContext.getProperties();
+ conf = ConfigurationUtil.toConfiguration(pigProperties, false);
splitsSeen = Maps.newHashMap();
tezPlan = new TezOperPlan();
nig = NodeIdGenerator.getGenerator();
@@ -197,6 +212,7 @@ public class TezCompiler extends PhyPlan
scope = roots.get(0).getOperatorKey().getScope();
localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
phyToTezOpMap = Maps.newHashMap();
+ inputsMap = Maps.newHashMap();
fileConcatenationThreshold = Integer.parseInt(pigProperties
.getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -894,6 +910,7 @@ public class TezCompiler extends PhyPlan
public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
try {
blocking();
+ inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs)));
TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
if (op.isCross()) {
@@ -1340,6 +1357,9 @@ public class TezCompiler extends PhyPlan
} else if (op.getNumInps() > 1) {
curTezOp.markCogroup();
}
+ } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+ curTezOp.markRegularJoin();
+ addBloomToJoin(op, curTezOp);
}
} catch (Exception e) {
int errCode = 2034;
@@ -1348,6 +1368,132 @@ public class TezCompiler extends PhyPlan
}
}
+ private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException {
+
+ List<TezOperator> inputs = inputsMap.get(curTezOp);
+ TezOperator buildBloomOp;
+ List<TezOperator> applyBloomOps = new ArrayList<>();
+
+ String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
+ boolean createBloomInMap = "map".equals(strategy);
+ if (!createBloomInMap && !strategy.equals("reduce")) {
+ throw new PlanException(new IllegalArgumentException(
+ "Invalid value for "
+ + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " - "
+ + strategy + ". Valid values are map and reduce"));
+ }
+ int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
+ int vectorSizeBytes = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
+ int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf);
+ int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
+
+ // We build bloom of the right most input and apply the bloom filter on the left inputs by default.
+ // But in case of left outer join we build bloom of the left input and use it on the right input
+ boolean[] inner = op.getPkgr().getInner();
+ boolean skipNullKeys = true;
+ if (inner[inner.length - 1]) { // inner has from right to left while inputs has from left to right
+ buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input
+ for (int i = 0; i < (inner.length - 1); i++) {
+ applyBloomOps.add(inputs.get(i));
+ }
+ skipNullKeys = inner[0];
+ } else {
+ // Left outer join
+ skipNullKeys = false;
+ buildBloomOp = inputs.get(0); // Bloom filter is built from left most input
+ for (int i = 1; i < inner.length; i++) {
+ applyBloomOps.add(inputs.get(i));
+ }
+ }
+
+ // Add BuildBloom operator to the input
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0);
+ POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
+ bbr.setSkipNullKeys(skipNullKeys);
+ buildBloomOp.plan.remove(lr);
+ buildBloomOp.plan.addAsLeaf(bbr);
+
+ // Add a new reduce vertex that will construct the final bloom filter
+ // - by combining the bloom filters from the buildBloomOp input tasks in the map strategy
+ // - or directly from the keys from the buildBloomOp input tasks in the reduce strategy
+ TezOperator combineBloomOp = getTezOp();
+ tezPlan.add(combineBloomOp);
+ combineBloomOp.markBuildBloom();
+ // Explicitly set the parallelism for the new vertex to number of bloom filters.
+ // Auto parallelism will bring it down based on the actual output size
+ combineBloomOp.setEstimatedParallelism(numBloomFilters);
+ // We don't want parallelism to be changed during the run by grace auto parallelism
+ // It will take the whole input size and estimate way higher
+ combineBloomOp.setDontEstimateParallelism(true);
+
+ String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
+ TezEdgeDescriptor edge = new TezEdgeDescriptor();
+ TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
+ bbr.setBloomOutputKey(combineBloomOpKey);
+
+
+ POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
+ pkg.setNumInps(1);
+ BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
+ pkgr.setKeyType(DataType.INTEGER);
+ pkg.setPkgr(pkgr);
+ POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
+ combineBloomOp.plan.addAsLeaf(pkg);
+ combineBloomOp.plan.addAsLeaf(combineBloomOutput);
+
+ edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+ edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+
+ // Add combiner as well.
+ POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+ BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
+ combinerPkgr.setCombiner(true);
+ combinerPkgr.setKeyType(DataType.INTEGER);
+ pkg_c.setPkgr(combinerPkgr);
+ pkg_c.setNumInps(1);
+ edge.combinePlan.addAsLeaf(pkg_c);
+ POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+ prjKey.setResultType(DataType.INTEGER);
+ List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(prjKey);
+ clrInps.add(pp);
+ POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+ clr.setOutputKey(combineBloomOpKey);
+ edge.combinePlan.addAsLeaf(clr);
+
+ if (createBloomInMap) {
+ // No combiner needed on map as there will be only one bloom filter per map for each partition
+ // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
+ edge.setCombinerInMap(false);
+ edge.setCombinerInReducer(true);
+ } else {
+ pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+ // Do distinct of the keys on the map side to reduce data sent to reducers.
+ // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
+ // If needed one can be added later
+ edge.setCombinerInMap(true);
+ edge.setCombinerInReducer(false);
+ }
+
+ // Broadcast the final bloom filter to other inputs
+ for (TezOperator applyBloomOp : applyBloomOps) {
+ applyBloomOp.markFilterBloom();
+ lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
+ POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters);
+ applyBloomOp.plan.remove(lr);
+ applyBloomOp.plan.addAsLeaf(bfr);
+ bfr.setInputKey(combineBloomOpKey);
+ edge = new TezEdgeDescriptor();
+ edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+ edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+ TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge);
+ combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
+ }
+
+ }
+
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
try{
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Thu Jan 26 17:40:35 2017
@@ -31,9 +31,13 @@ import org.apache.tez.runtime.library.ou
* Descriptor for Tez edge. It holds combine plan as well as edge properties.
*/
public class TezEdgeDescriptor implements Serializable {
- // Combiner runs on both input and output of Tez edge.
- transient public PhysicalPlan combinePlan;
+
+ public transient PhysicalPlan combinePlan;
private boolean needsDistinctCombiner;
+ // Combiner runs on both input and output of Tez edge by default
+ // It can be configured to run only in output(map) or input(reduce)
+ private Boolean combinerInMap;
+ private Boolean combinerInReducer;
public String inputClassName;
public String outputClassName;
@@ -74,6 +78,22 @@ public class TezEdgeDescriptor implement
needsDistinctCombiner = nic;
}
+ public Boolean getCombinerInMap() {
+ return combinerInMap;
+ }
+
+ public void setCombinerInMap(Boolean combinerInMap) {
+ this.combinerInMap = combinerInMap;
+ }
+
+ public Boolean getCombinerInReducer() {
+ return combinerInReducer;
+ }
+
+ public void setCombinerInReducer(Boolean combinerInReducer) {
+ this.combinerInReducer = combinerInReducer;
+ }
+
public boolean isUseSecondaryKey() {
return useSecondaryKey;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Thu Jan 26 17:40:35 2017
@@ -181,7 +181,11 @@ public class TezOperator extends Operato
// Indicate if this job is a native job
NATIVE,
// Indicate if this job does rank counter
- RANK_COUNTER;
+ RANK_COUNTER,
+ // Indicate if this job constructs bloom filter
+ BUILDBLOOM,
+ // Indicate if this job applies bloom filter
+ FILTERBLOOM;
};
// Features in the job/vertex. Mostly will be only one feature.
@@ -453,6 +457,22 @@ public class TezOperator extends Operato
feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
}
+ public boolean isBuildBloom() {
+ return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
+ }
+
+ public void markBuildBloom() {
+ feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
+ }
+
+ public boolean isFilterBloom() {
+ return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
+ }
+
+ public void markFilterBloom() {
+ feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
+ }
+
public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
for (OPER_FEATURE opf : OPER_FEATURE.values()) {
if (excludeFeatures != null && excludeFeatures.contains(opf)) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Thu Jan 26 17:40:35 2017
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -161,7 +162,7 @@ public class TezPOPackageAnnotator exten
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
- if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
+ if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
return;
}
loRearrangeFound++;
@@ -180,7 +181,9 @@ public class TezPOPackageAnnotator exten
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
- Integer index = Integer.valueOf(lrearrange.getIndex());
+ // For BloomPackager there is only one input, but the
+ // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero
+ Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex());
if(keyInfo.get(index) != null) {
if (isPOSplit) {
// Case of POSplit having more than one input in case of self join or union
@@ -197,12 +200,20 @@ public class TezPOPackageAnnotator exten
}
- keyInfo.put(index,
- new Pair<Boolean, Map<Integer, Integer>>(
- lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
- pkg.getPkgr().setKeyInfo(keyInfo);
- pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
- pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+ if (pkg.getPkgr() instanceof BloomPackager ) {
+ keyInfo.put(index,
+ new Pair<Boolean, Map<Integer, Integer>>(
+ Boolean.FALSE, new HashMap<Integer, Integer>()));
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ } else {
+ keyInfo.put(index,
+ new Pair<Boolean, Map<Integer, Integer>>(
+ lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+ }
+
}
/**
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1780431&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Thu Jan 26 17:40:35 2017
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class BloomPackager extends Packager {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean bloomCreatedInMap;
+ private int vectorSizeBytes;
+ private int numHash;
+ private int hashType;
+ private byte bloomKeyType;
+ private boolean isCombiner;
+
+ private transient ByteArrayOutputStream baos;
+ private transient Iterator<Object> distinctKeyIter;
+
+ public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+ int numHash, int hashType) {
+ super();
+ this.bloomCreatedInMap = bloomCreatedInMap;
+ this.vectorSizeBytes = vectorSizeBytes;
+ this.numHash = numHash;
+ this.hashType = hashType;
+ }
+
+ public void setBloomKeyType(byte keyType) {
+ bloomKeyType = keyType;
+ }
+
+ public void setCombiner(boolean isCombiner) {
+ this.isCombiner = isCombiner;
+ }
+
+ @Override
+ public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+ throws ExecException {
+ this.key = key;
+ this.bags = bags;
+ this.readOnce = readOnce;
+ // Bag can be read directly and need not be materialized again
+ }
+
+ @Override
+ public Result getNext() throws ExecException {
+ try {
+ if (bloomCreatedInMap) {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ // Same function for combiner and reducer
+ return combineBloomFilters();
+ } else {
+ if (isCombiner) {
+ return getDistinctBloomKeys();
+ } else {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+ return createBloomFilter();
+ }
+ }
+ } catch (IOException e) {
+ throw new ExecException("Error while constructing final bloom filter", e);
+ }
+ }
+
+ private Result combineBloomFilters() throws IOException {
+ // We get a bag of bloom filters. combine them into one
+ Iterator<Tuple> iter = bags[0].iterator();
+ Tuple tup = iter.next();
+ DataByteArray bloomBytes = (DataByteArray) tup.get(0);
+ BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+ while (iter.hasNext()) {
+ tup = iter.next();
+ bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+ }
+
+ Object partition = key;
+ detachInput(); // Free up the key and bags reference
+
+ return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
+ }
+
+ private Result createBloomFilter() throws IOException {
+ // We get a bag of keys. Create a bloom filter from them
+ // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
+ HashSet<Object> bloomKeys = new HashSet<>();
+ Iterator<Tuple> iter = bags[0].iterator();
+ while (iter.hasNext()) {
+ bloomKeys.add(iter.next().get(0));
+ }
+
+ Object partition = key;
+ detachInput(); // Free up the key and bags reference
+
+ BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ for (Object bloomKey: bloomKeys) {
+ Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
+ bloomFilter.add(k);
+ }
+ bloomKeys = null;
+ return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
+
+ }
+
+ private Result getSerializedBloomFilter(Object partition,
+ BloomFilter bloomFilter, int serializedSize) throws ExecException,
+ IOException {
+ if (baos == null) {
+ baos = new ByteArrayOutputStream(serializedSize);
+ }
+ baos.reset();
+ DataOutputStream dos = new DataOutputStream(baos);
+ bloomFilter.write(dos);
+ dos.flush();
+
+ Tuple res = mTupleFactory.newTuple(2);
+ res.set(0, partition);
+ res.set(1, new DataByteArray(baos.toByteArray()));
+
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+
+ private Result getDistinctBloomKeys() throws ExecException {
+ if (distinctKeyIter == null) {
+ HashSet<Object> bloomKeys = new HashSet<>();
+ Iterator<Tuple> iter = bags[0].iterator();
+ while (iter.hasNext()) {
+ bloomKeys.add(iter.next().get(0));
+ }
+ distinctKeyIter = bloomKeys.iterator();
+ }
+ while (distinctKeyIter.hasNext()) {
+ Tuple res = mTupleFactory.newTuple(2);
+ res.set(0, key);
+ res.set(1, distinctKeyIter.next());
+
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+ distinctKeyIter = null;
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1780431&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Thu Jan 26 17:40:35 2017
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
+ private String inputKey;
+ private transient KeyValueReader reader;
+ private transient String cacheKey;
+ private int numBloomFilters;
+ private transient BloomFilter[] bloomFilters;
+
+ public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
+ super(lr);
+ this.numBloomFilters = numBloomFilters;
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ @Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ cacheKey = "bloom-" + inputKey;
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ inputsToSkip.add(inputKey);
+ }
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ bloomFilters = (BloomFilter[]) cacheValue;
+ return;
+ }
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ while (reader.next()) {
+ if (bloomFilters == null) {
+ bloomFilters = new BloomFilter[numBloomFilters];
+ }
+ Tuple val = (Tuple) reader.getCurrentValue();
+ int index = (int) val.get(0);
+ bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+ }
+ ObjectCache.getInstance().cache(cacheKey, bloomFilters);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+
+ // If there is no bloom filter, then it means right input was empty
+ // Skip processing
+ if (bloomFilters == null) {
+ return RESULT_EOP;
+ }
+
+ while (true) {
+ res = super.getRearrangedTuple();
+ try {
+ switch (res.returnStatus) {
+ case POStatus.STATUS_OK:
+ if (illustrator == null) {
+ Tuple result = (Tuple) res.result;
+ Byte index = (Byte) result.get(0);
+
+ // Skip the record if key is not in the bloom filter
+ if (!isKeyInBloomFilter(result.get(1))) {
+ continue;
+ }
+ PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+ NullableTuple val = new NullableTuple((Tuple)result.get(2));
+ key.setIndex(index);
+ val.setIndex(index);
+ writer.write(key, val);
+ } else {
+ illustratorMarkup(res.result, res.result, 0);
+ }
+ continue;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ case POStatus.STATUS_ERR:
+ default:
+ return res;
+ }
+ } catch (IOException ioe) {
+ int errCode = 2135;
+ String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
+ throw new ExecException(msg, errCode, ioe);
+ }
+ }
+ }
+
+ private boolean isKeyInBloomFilter(Object key) throws ExecException {
+ if (key == null) {
+ // Null values are dropped in a inner join and in the case of outer join,
+ // POBloomFilterRearrangeTez is only in the plan on the non outer relation.
+ // So just skip them
+ return false;
+ }
+ if (bloomFilters.length == 1) {
+ // Skip computing hashcode
+ Key k = new Key(DataType.toBytes(key, keyType));
+ return bloomFilters[0].membershipTest(k);
+ } else {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ BloomFilter filter = bloomFilters[partition];
+ if (filter != null) {
+ Key k = new Key(DataType.toBytes(key, keyType));
+ return filter.membershipTest(k);
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
+ return (POBloomFilterRearrangeTez) super.clone();
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "BloomFilter Rearrange" + "["
+ + DataType.findTypeName(resultType) + "]" + "{"
+ + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1780431&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Thu Jan 26 17:40:35 2017
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
+ * In addition, it also writes out the bloom filter constructed from the join keys
+ * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
+ *
+ * Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
+ * It also allows us to have lower false positives with smaller vector sizes.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
+
+ public static final String DEFAULT_BLOOM_STRATEGY = "map";
+ public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
+ public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
+ public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
+ public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
+
+ private String bloomOutputKey;
+ private boolean skipNullKeys = false;
+ private boolean createBloomInMap;
+ private int numBloomFilters;
+ private int vectorSizeBytes;
+ private int numHash;
+ private int hashType;
+
+ private transient BloomFilter[] bloomFilters;
+ private transient KeyValueWriter bloomWriter;
+ private transient PigNullableWritable nullKey;
+ private transient Tuple bloomValue;
+ private transient NullableTuple bloomNullableTuple;
+
+ public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
+ boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
+ int numHash, int hashType) {
+ super(lr);
+ this.createBloomInMap = createBloomInMap;
+ this.numBloomFilters = numBloomFilters;
+ this.vectorSizeBytes = vectorSizeBytes;
+ this.numHash = numHash;
+ this.hashType = hashType;
+ }
+
+ public static int getNumBloomFilters(Configuration conf) {
+ if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
+ return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
+ } else {
+ return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
+ }
+ }
+
+ public void setSkipNullKeys(boolean skipNullKeys) {
+ this.skipNullKeys = skipNullKeys;
+ }
+
+ public void setBloomOutputKey(String bloomOutputKey) {
+ this.bloomOutputKey = bloomOutputKey;
+ }
+
+ @Override
+ public boolean containsOutputKey(String key) {
+ if(super.containsOutputKey(key)) {
+ return true;
+ }
+ return bloomOutputKey.equals(key);
+ }
+
+ @Override
+ public String[] getTezOutputs() {
+ return new String[] { outputKey, bloomOutputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(outputKey)) {
+ outputKey = newOutputKey;
+ } else if (oldOutputKey.equals(bloomOutputKey)) {
+ bloomOutputKey = newOutputKey;
+ }
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ super.attachOutputs(outputs, conf);
+ LogicalOutput output = outputs.get(bloomOutputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
+ }
+ try {
+ bloomWriter = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ bloomFilters = new BloomFilter[numBloomFilters];
+ bloomValue = mTupleFactory.newTuple(1);
+ bloomNullableTuple = new NullableTuple(bloomValue);
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+
+ PigNullableWritable key;
+ while (true) {
+ res = super.getRearrangedTuple();
+ try {
+ switch (res.returnStatus) {
+ case POStatus.STATUS_OK:
+ if (illustrator == null) {
+ Tuple result = (Tuple) res.result;
+ Byte index = (Byte) result.get(0);
+
+ Object keyObj = result.get(1);
+ if (keyObj != null) {
+ key = HDataType.getWritableComparableTypes(keyObj, keyType);
+ // null keys cannot be part of bloom filter
+ // Since they are also dropped during join we can skip them
+ if (createBloomInMap) {
+ addKeyToBloomFilter(keyObj);
+ } else {
+ writeJoinKeyForBloom(keyObj);
+ }
+ } else if (skipNullKeys) {
+ // Inner join. So don't bother writing null key
+ continue;
+ } else {
+ if (nullKey == null) {
+ nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
+ }
+ key = nullKey;
+ }
+
+ NullableTuple val = new NullableTuple((Tuple)result.get(2));
+ key.setIndex(index);
+ val.setIndex(index);
+ writer.write(key, val);
+ } else {
+ illustratorMarkup(res.result, res.result, 0);
+ }
+ continue;
+ case POStatus.STATUS_NULL:
+ continue;
+ case POStatus.STATUS_EOP:
+ if (this.parentPlan.endOfAllInput && createBloomInMap) {
+ // In case of Split will get EOP after every record.
+ // So check for endOfAllInput
+ writeBloomFilters();
+ }
+ case POStatus.STATUS_ERR:
+ default:
+ return res;
+ }
+ } catch (IOException ioe) {
+ int errCode = 2135;
+ String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
+ throw new ExecException(msg, errCode, ioe);
+ }
+ }
+ }
+
+ private void addKeyToBloomFilter(Object key) throws ExecException {
+ Key k = new Key(DataType.toBytes(key, keyType));
+ if (bloomFilters.length == 1) {
+ if (bloomFilters[0] == null) {
+ bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ }
+ bloomFilters[0].add(k);
+ } else {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ BloomFilter filter = bloomFilters[partition];
+ if (filter == null) {
+ filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+ bloomFilters[partition] = filter;
+ }
+ filter.add(k);
+ }
+ }
+
+ private void writeJoinKeyForBloom(Object key) throws IOException {
+ int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+ bloomValue.set(0, key);
+ bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+ }
+
+ private void writeBloomFilters() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
+ for (int i = 0; i < bloomFilters.length; i++) {
+ if (bloomFilters[i] != null) {
+ DataOutputStream dos = new DataOutputStream(baos);
+ bloomFilters[i].write(dos);
+ dos.flush();
+ bloomValue.set(0, new DataByteArray(baos.toByteArray()));
+ bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+ baos.reset();
+ }
+ }
+ }
+
+ @Override
+ public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
+ return (POBuildBloomRearrangeTez) super.clone();
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "BuildBloom Rearrange" + "["
+ + DataType.findTypeName(resultType) + "]" + "{"
+ + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Thu Jan 26 17:40:35 2017
@@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends
}
}
- public String getOutputKey() {
- return outputKey;
+ public boolean containsOutputKey(String key) {
+ return outputKey.equals(key);
}
public void setOutputKey(String outputKey) {
@@ -122,6 +122,10 @@ public class POLocalRearrangeTez extends
}
}
+ protected Result getRearrangedTuple() throws ExecException {
+ return super.getNextTuple();
+ }
+
@Override
public Result getNextTuple() throws ExecException {
res = super.getNextTuple();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Thu Jan 26 17:40:35 2017
@@ -129,7 +129,9 @@ public class POShuffleTezLoad extends PO
finished[i] = !readers.get(i).next();
}
- this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+ this.readOnceOneBag = (numInputs == 1)
+ && (pkgr instanceof CombinerPackager
+ || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
if (readOnceOneBag) {
readOnce[0] = true;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Thu Jan 26 17:40:35 2017
@@ -69,6 +69,11 @@ public class CombinerOptimizer extends T
}
for (TezOperator from : predecessors) {
+ PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
+ if (!combinePlan.isEmpty()) {
+ // Cases like bloom join have combine plan already set
+ continue;
+ }
List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
if (rearranges.isEmpty()) {
continue;
@@ -77,7 +82,7 @@ public class CombinerOptimizer extends T
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
@@ -90,7 +95,6 @@ public class CombinerOptimizer extends T
// Detected the POLocalRearrange -> POPackage pattern. Let's add
// combiner if possible.
- PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
if(!combinePlan.isEmpty()) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Thu Jan 26 17:40:35 2017
@@ -123,8 +123,8 @@ public class ParallelismSetter extends T
boolean overrideRequestedParallelism = false;
if (parallelism != -1
&& autoParallelismEnabled
- && tezOp.isIntermediateReducer()
&& !tezOp.isDontEstimateParallelism()
+ && tezOp.isIntermediateReducer()
&& tezOp.isOverrideIntermediateParallelism()) {
overrideRequestedParallelism = true;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Thu Jan 26 17:40:35 2017
@@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex
POLocalRearrangeTez connectingLR = null;
PhysicalPlan rearrangePlan = from.plan;
for (POLocalRearrangeTez lr : rearranges) {
- if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+ if (lr.containsOutputKey(to.getOperatorKey().toString())) {
connectingLR = lr;
break;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Thu Jan 26 17:40:35 2017
@@ -30,6 +30,8 @@ public class TezEstimatedParallelismClea
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
- tezOp.setEstimatedParallelism(-1);
+ if (!tezOp.isDontEstimateParallelism()) {
+ tezOp.setEstimatedParallelism(-1);
+ }
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Thu Jan 26 17:40:35 2017
@@ -97,16 +97,16 @@ public class TezOperDependencyParallelis
bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+ // If we have already estimated parallelism, use that one
+ if (tezOper.getEstimatedParallelism() != -1) {
+ return tezOper.getEstimatedParallelism();
+ }
+
// If parallelism is set explicitly, respect it
if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
return tezOper.getRequestedParallelism();
}
- // If we have already estimated parallelism, use that one
- if (tezOper.getEstimatedParallelism()!=-1) {
- return tezOper.getEstimatedParallelism();
- }
-
List<TezOperator> preds = plan.getPredecessors(tezOper);
if (preds==null) {
throw new IOException("Cannot estimate parallelism for source vertex");
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Thu Jan 26 17:40:35 2017
@@ -38,6 +38,7 @@ public class LOJoin extends LogicalRelat
*/
public static enum JOINTYPE {
HASH, // Hash Join
+ BLOOM, // Bloom Join
REPLICATED, // Fragment Replicated join
SKEWED, // Skewed Join
MERGE, // Sort Merge Join