You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/26 17:40:36 UTC

svn commit: r1780431 [1/2] - in /pig/trunk: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/o...

Author: rohini
Date: Thu Jan 26 17:40:35 2017
New Revision: 1780431

URL: http://svn.apache.org/viewvc?rev=1780431&view=rev
Log:
PIG-4963: Add a Bloom join (rohini)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
    pig/trunk/test/e2e/pig/tests/join.conf
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
    pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
    pig/trunk/test/e2e/pig/build.xml
    pig/trunk/test/e2e/pig/tests/multiquery.conf
    pig/trunk/test/e2e/pig/tests/orc.conf
    pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 26 17:40:35 2017
@@ -34,6 +34,8 @@ PIG-5067: Revisit union on numeric type
  
 IMPROVEMENTS
 
+PIG-4963: Add a Bloom join (rohini)
+
 PIG-3938: Add LoadCaster to EvalFunc (knoguchi)
 
 PIG-5105: Tez unit tests failing with "Argument list too long" (rohini)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Thu Jan 26 17:40:35 2017
@@ -6955,7 +6955,7 @@ public class SimpleCustomPartitioner ext
    <table>
       <tr> 
             <td>
-               <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];  </p>
+               <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'bloom' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];  </p>
             </td>
          </tr> 
    </table></section>
@@ -7004,7 +7004,16 @@ public class SimpleCustomPartitioner ext
                <p>Use to perform replicated joins (see <a href="perf.html#replicated-joins">Replicated Joins</a>).</p>
             </td>
          </tr>
-         
+
+         <tr>
+            <td>
+               <p>'bloom'</p>
+            </td>
+            <td>
+               <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+            </td>
+         </tr>
+
          <tr>
             <td>
                <p>'skewed'</p>
@@ -7142,7 +7151,7 @@ DUMP X;
       <tr> 
             <td>
                <p>alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column 
-               [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];  </p>
+               [USING 'replicated' | 'bloom' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];  </p>
             </td>
          </tr> 
    </table>
@@ -7213,7 +7222,7 @@ DUMP X;
             </td>
          </tr>
 
-  <tr>
+         <tr>
             <td>
                <p>USING</p>
             </td>
@@ -7230,8 +7239,18 @@ DUMP X;
                <p>Only left outer join is supported for replicated joins.</p>
             </td>
          </tr>
-         
-                  <tr>
+
+         <tr>
+            <td>
+               <p>'bloom'</p>
+            </td>
+            <td>
+               <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+               <p>Full outer join is not supported for bloom joins.</p>
+            </td>
+         </tr>
+
+         <tr>
             <td>
                <p>'skewed'</p>
             </td>
@@ -7324,6 +7343,13 @@ B = LOAD 'tiny';
 C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';
 </source>
 
+<p>This example shows a bloom right outer join.</p>
+<source>
+A = LOAD 'large';
+B = LOAD 'small';
+C= JOIN A BY $0 RIGHT, B BY $0 USING 'bloom';
+</source>
+
 <p>This example shows a skewed full outer join.</p>
 <source>
 A = LOAD 'studenttab' as (name, age, gpa);

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Thu Jan 26 17:40:35 2017
@@ -1202,6 +1202,100 @@ gets 1 GB of memory. Please share your o
 </section>
 <!-- END FRAGMENT REPLICATE JOINS-->
 
+<!-- BLOOM JOINS-->
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="bloom-joins">
+<title>Bloom Joins</title>
+<p>Bloom join is a special type of join where a bloom filter is constructed using join keys of one relation and
+used to filter records of the other relations before doing a regular hash join.
+The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side.
+Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less
+compared to the total records allowing many to be filtered before the join.
+Before bloom join was added as a type of join, same functionality was achieved by users by using
+the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient and required more lines of code as well.
+Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.</p>
+
+<section>
+<title>Usage</title>
+<p>Perform a bloom join with the USING clause (see <a href="basic.html#join-inner">JOIN (inner)</a> and <a href="basic.html#join-outer">JOIN (outer)</a>).
+In this example, a large relation is joined with two smaller relations. Note that the large relation comes first followed by the smaller relations.
+Bloom filter is built from join keys of the right most relation which is small and the filter is applied on the big and medium relations.
+None of the relations are required to fit into main memory. </p>
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+medium = LOAD 'medium_data' AS (m1,m2,m3);
+
+small = LOAD 'small_data' AS (s1,s2,s3);
+
+C = JOIN big BY b1, medium BY m1, small BY s1 USING 'bloom';
+</source>
+
+<p>
+In the case of inner join and right outer join, the right most relation is used for building the bloom filter
+and the users are expected to specify the smaller dataset as the right most relation.
+But in the case of left outer join, the left most relation is used for building the bloom filter and is expected to be the smaller dataset.
+This is because all records of the outer relation should be in the result and no records can be filtered.
+If the left relation turns out to be the bigger dataset, it would not be as efficient to build the bloom filter on the bigger dataset.
+But it might still perform better than a regular join if it is able to filter lot of records from the right relation.
+</p>
+
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+small = LOAD 'small_data' AS (m1,m2,m3);
+
+C = JOIN small BY s1 LEFT, big BY b1 USING 'bloom';
+</source>
+</section>
+
+<section>
+<title>Conditions</title>
+<ul>
+<li>Bloom join cannot be used with a FULL OUTER join.</li>
+<li>If the the underlying data is sufficiently skewed, bloom join might not help. Skewed join can be considered for those cases.</li>
+</ul>
+</section>
+
+<section>
+<title>Tuning options</title>
+<p>
+There are multiple <a href="start.html#properties">pig properties</a> than can be configured to construct a more efficient bloom filter.
+See <a href="http://en.wikipedia.org/wiki/Bloom_filter">Bloom Filter</a> for a discussion of how to select the number of bits and the number of hash functions.
+Easier option would be to search for "bloom filter calculator" in a search engine and use one of the online bloom filter calculators available to arrive at the desired values.
+</p>
+<ul>
+<li>pig.bloomjoin.strategy - The valid values for this are 'map' and 'reduce'. Default value is map.
+Bloom join has two different kind of implementations to be more efficient in different cases.
+In general, there is an extra reduce step in the DAG for construction of the bloom filter(s).
+<ul>
+<li>map - In each map, bloom filters are computed on the join keys partitioned by the hashcode of the key
+with pig.bloomjoin.num.filters number of partitions.
+Bloom filters for each partition from different maps are then combined in the reducers producing one bloom filter per partition.
+The default value of pig.bloomjoin.num.filters is 1 for this strategy and so usually only one bloom filter is created.
+This is efficient and fast if there are smaller number of maps (&lt;10) and the number of distinct keys are not too high.
+It can be faster with larger number of maps and even with bigger bloom vector sizes,
+ but the amount of data shuffled to the reducer for aggregation becomes huge making it inefficient.</li>
+<li>reduce - Join keys are sent from the map to the reducer partitioned by hashcode of the key with
+pig.bloomjoin.num.filters number of partitions. In the reducers, one bloom filter is then computed per partition.
+Number of reducers are set equal to the number of partitions allowing for each bloom filter to be computed in parallel.
+The default value of pig.bloomjoin.num.filters is 11 for this strategy.
+This is efficient for larger datasets with lot of maps or very large bloom vector size.
+In this case size of keys sent to the reducer is smaller than sending bloom filters to reducer for aggregation making it efficient.</li>
+</ul>
+</li>
+<li>pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.</li>
+<li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter.
+A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).</li>
+<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li>
+<li>pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.</li>
+</ul>
+</section>
+
+</section>
+<!-- END BLOOM JOINS-->
+
 <!-- +++++++++++++++++++++++++++++++ -->
 <!-- SKEWED JOINS-->
 <section id="skewed-joins">

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Jan 26 17:40:35 2017
@@ -134,6 +134,58 @@ public class PigConfiguration {
     public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
 
     /**
+     * Bloom join has two different kind of implementations.
+     * <ul>
+     * <li>map <br>
+     * In each map, bloom filters are computed on the join keys partitioned by
+     * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of
+     * partitions. Bloom filters from different maps are then combined in the
+     * reducer producing one bloom filter per partition. This is efficient and
+     * fast if there are smaller number of maps (<10) and the number of
+     * distinct keys are not too high. It can be faster with larger number of
+     * maps and even with bigger bloom vector sizes, but the amount of data
+     * shuffled to the reducer for aggregation becomes huge making it
+     * inefficient.</li>
+     * <li>reduce <br>
+     * Join keys are sent from the map to the reducer partitioned by hashcode of
+     * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One
+     * bloom filter is then created per partition. This is efficient for larger
+     * datasets with lot of maps or very large
+     * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent
+     * to the reducer is smaller than sending bloom filters to reducer for
+     * aggregation making it efficient.</li>
+     * </ul>
+     * Default value is map.
+     */
+    public static final String PIG_BLOOMJOIN_STRATEGY = "pig.bloomjoin.strategy";
+
+    /**
+     * The number of bloom filters that will be created.
+     * Default is 1 for map strategy and 11 for reduce strategy.
+     */
+    public static final String PIG_BLOOMJOIN_NUM_FILTERS = "pig.bloomjoin.num.filters";
+
+    /**
+     * The size in bytes of the bit vector to be used for the bloom filter.
+     * A bigger vector size will be needed when the number of distinct keys is higher.
+     * Default value is 1048576 (1MB).
+     */
+    public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = "pig.bloomjoin.vectorsize.bytes";
+
+    /**
+     * The type of hash function to use. Valid values are jenkins and murmur.
+     * Default is murmur.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_TYPE = "pig.bloomjoin.hash.type";
+
+    /**
+     * The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+     * Higher the number lower the false positives. Too high a value can increase the cpu time.
+     * Default value is 3.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions";
+
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Jan 26 17:40:35 2017
@@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
-            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN
+                    || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+                // Bloom join is not implemented in mapreduce mode and falls back to regular join
                 curMROp.markRegularJoin();
             } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Jan 26 17:40:35 2017
@@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -160,7 +161,7 @@ public class PigCombiner {
             // tuples out of the getnext() call of POJoinPackage
             // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
-            if (pack.getPkgr() instanceof JoinPackager)
+            if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Jan 26 17:40:35 2017
@@ -21,14 +21,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -105,7 +107,7 @@ public class EndOfAllInputSetter extends
         public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
             endOfAllInputFlag = true;
         }
-        
+
         @Override
         public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
             endOfAllInputFlag = true;
@@ -122,6 +124,13 @@ public class EndOfAllInputSetter extends
             }
         }
 
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+            if (lr instanceof POBuildBloomRearrangeTez) {
+                endOfAllInputFlag = true;
+            }
+            super.visitLocalRearrange(lr);
+        }
 
         /**
          * @return if end of all input is present

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Jan 26 17:40:35 2017
@@ -441,6 +441,10 @@ public abstract class PhysicalOperator e
     public void reset() {
     }
 
+    public boolean isEndOfAllInput() {
+        return parentPlan.endOfAllInput;
+    }
+
     /**
      * @return PigProgressable stored in threadlocal
      */

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Jan 26 17:40:35 2017
@@ -51,13 +51,13 @@ public class Packager implements Illustr
     protected DataBag[] bags;
 
     public static enum PackageType {
-        GROUP, JOIN
+        GROUP, JOIN, BLOOMJOIN
     };
 
     protected transient Illustrator illustrator = null;
 
     // The key being worked on
-    Object key;
+    protected Object key;
 
     // marker to indicate if key is a tuple
     protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@ public class Packager implements Illustr
     protected boolean isKeyCompound = false;
 
     // key's type
-    byte keyType;
+    protected byte keyType;
 
     // The number of inputs to this
     // co-group. 0 indicates a distinct, which means there will only be a

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan 26 17:40:35 2017
@@ -479,7 +479,7 @@ public class TezDagBuilder extends TezOp
                 POLocalRearrangeTez.class);
 
         for (POLocalRearrangeTez lr : lrs) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
                 setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput);
                 // In case of secondary key sort, main key type is the actual key type
@@ -540,26 +540,36 @@ public class TezDagBuilder extends TezOp
 
         UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
         out.setUserPayload(payLoad);
+        in.setUserPayload(payLoad);
 
+        // Remove combiner and reset payload
         if (!combinePlan.isEmpty()) {
             boolean noCombineInReducer = false;
+            boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap();
             String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
-            if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
+            if (edge.getCombinerInReducer() != null) {
+                noCombineInReducer = !edge.getCombinerInReducer();
+            } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) {
                 noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
             } else {
                 noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
             }
-            if (noCombineInReducer) {
+            if (noCombineInReducer || noCombineInMapper) {
                 log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey());
                 conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
                 conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
                 conf.unset("pig.combinePlan");
                 conf.unset("pig.combine.package");
                 conf.unset("pig.map.keytype");
-                payLoad = TezUtils.createUserPayloadFromConf(conf);
+                UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf);
+                if (noCombineInMapper) {
+                    out.setUserPayload(payLoadWithoutCombiner);
+                }
+                if (noCombineInReducer) {
+                    in.setUserPayload(payLoadWithoutCombiner);
+                }
             }
         }
-        in.setUserPayload(payLoad);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -717,7 +727,7 @@ public class TezDagBuilder extends TezOp
                             PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
                         if (lr.isConnectedToPackage()
-                                && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+                                && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int) lr.getIndex(), inputKey);
                             if (isVertexGroup) {
                                 isMergedInput = true;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Thu Jan 26 17:40:35 2017
@@ -32,10 +32,12 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.hash.Hash;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
@@ -44,8 +46,10 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -82,7 +86,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -110,6 +117,7 @@ import org.apache.pig.impl.builtin.GetMe
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -167,6 +175,10 @@ public class TezCompiler extends PhyPlan
 
     private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
 
+    // Contains the inputs to operator like join, with the list maintaining the
+    // same order of join from left to right
+    private Map<TezOperator, List<TezOperator>> inputsMap;
+
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
     public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
     public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -175,6 +187,8 @@ public class TezCompiler extends PhyPlan
     private boolean optimisticFileConcatenation = false;
     private List<String> readOnceLoadFuncs = null;
 
+    private Configuration conf;
+
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
     public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -184,6 +198,7 @@ public class TezCompiler extends PhyPlan
         this.pigContext = pigContext;
 
         pigProperties = pigContext.getProperties();
+        conf = ConfigurationUtil.toConfiguration(pigProperties, false);
         splitsSeen = Maps.newHashMap();
         tezPlan = new TezOperPlan();
         nig = NodeIdGenerator.getGenerator();
@@ -197,6 +212,7 @@ public class TezCompiler extends PhyPlan
         scope = roots.get(0).getOperatorKey().getScope();
         localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
         phyToTezOpMap = Maps.newHashMap();
+        inputsMap = Maps.newHashMap();
 
         fileConcatenationThreshold = Integer.parseInt(pigProperties
                 .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -894,6 +910,7 @@ public class TezCompiler extends PhyPlan
     public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
         try {
             blocking();
+            inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs)));
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             if (op.isCross()) {
@@ -1340,6 +1357,9 @@ public class TezCompiler extends PhyPlan
                 } else if (op.getNumInps() > 1) {
                     curTezOp.markCogroup();
                 }
+            } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+                curTezOp.markRegularJoin();
+                addBloomToJoin(op, curTezOp);
             }
         } catch (Exception e) {
             int errCode = 2034;
@@ -1348,6 +1368,132 @@ public class TezCompiler extends PhyPlan
         }
     }
 
+    private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException {
+
+        List<TezOperator> inputs = inputsMap.get(curTezOp);
+        TezOperator buildBloomOp;
+        List<TezOperator> applyBloomOps = new ArrayList<>();
+
+        String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
+        boolean createBloomInMap = "map".equals(strategy);
+        if (!createBloomInMap && !strategy.equals("reduce")) {
+            throw new PlanException(new IllegalArgumentException(
+                    "Invalid value for "
+                            + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " -  "
+                            + strategy + ". Valid values are map and reduce"));
+        }
+        int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
+        int vectorSizeBytes =  conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
+        int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf);
+        int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
+
+        // We build bloom of the right most input and apply the bloom filter on the left inputs by default.
+        // But in case of left outer join we build bloom of the left input and use it on the right input
+        boolean[] inner = op.getPkgr().getInner();
+        boolean skipNullKeys = true;
+        if (inner[inner.length - 1]) {  // inner has from right to left while inputs has from left to right
+            buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input
+            for (int i = 0; i < (inner.length - 1); i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+            skipNullKeys = inner[0];
+        } else {
+            // Left outer join
+            skipNullKeys = false;
+            buildBloomOp = inputs.get(0); // Bloom filter is built from left most input
+            for (int i = 1; i < inner.length; i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+        }
+
+        // Add BuildBloom operator to the input
+        POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0);
+        POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
+        bbr.setSkipNullKeys(skipNullKeys);
+        buildBloomOp.plan.remove(lr);
+        buildBloomOp.plan.addAsLeaf(bbr);
+
+        // Add a new reduce vertex that will construct the final bloom filter
+        //    - by combining the bloom filters from the buildBloomOp input tasks in the map strategy
+        //    - or directly from the keys from the buildBloomOp input tasks in the reduce strategy
+        TezOperator combineBloomOp = getTezOp();
+        tezPlan.add(combineBloomOp);
+        combineBloomOp.markBuildBloom();
+        // Explicitly set the parallelism for the new vertex to number of bloom filters.
+        // Auto parallelism will bring it down based on the actual output size
+        combineBloomOp.setEstimatedParallelism(numBloomFilters);
+        // We don't want parallelism to be changed during the run by grace auto parallelism
+        // It will take the whole input size and estimate way higher
+        combineBloomOp.setDontEstimateParallelism(true);
+
+        String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
+        TezEdgeDescriptor edge = new TezEdgeDescriptor();
+        TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
+        bbr.setBloomOutputKey(combineBloomOpKey);
+
+
+        POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
+        pkg.setNumInps(1);
+        BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
+        pkgr.setKeyType(DataType.INTEGER);
+        pkg.setPkgr(pkgr);
+        POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
+        combineBloomOp.plan.addAsLeaf(pkg);
+        combineBloomOp.plan.addAsLeaf(combineBloomOutput);
+
+        edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+        edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+
+        // Add combiner as well.
+        POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+        BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
+        combinerPkgr.setCombiner(true);
+        combinerPkgr.setKeyType(DataType.INTEGER);
+        pkg_c.setPkgr(combinerPkgr);
+        pkg_c.setNumInps(1);
+        edge.combinePlan.addAsLeaf(pkg_c);
+        POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+        prjKey.setResultType(DataType.INTEGER);
+        List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+        PhysicalPlan pp = new PhysicalPlan();
+        pp.add(prjKey);
+        clrInps.add(pp);
+        POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+        clr.setOutputKey(combineBloomOpKey);
+        edge.combinePlan.addAsLeaf(clr);
+
+        if (createBloomInMap) {
+            // No combiner needed on map as there will be only one bloom filter per map for each partition
+            // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
+            edge.setCombinerInMap(false);
+            edge.setCombinerInReducer(true);
+        } else {
+            pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+            // Do distinct of the keys on the map side to reduce data sent to reducers.
+            // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
+            // If needed one can be added later
+            edge.setCombinerInMap(true);
+            edge.setCombinerInReducer(false);
+        }
+
+        // Broadcast the final bloom filter to other inputs
+        for (TezOperator applyBloomOp : applyBloomOps) {
+            applyBloomOp.markFilterBloom();
+            lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
+            POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters);
+            applyBloomOp.plan.remove(lr);
+            applyBloomOp.plan.addAsLeaf(bfr);
+            bfr.setInputKey(combineBloomOpKey);
+            edge = new TezEdgeDescriptor();
+            edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+            edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+            TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge);
+            combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
+        }
+
+    }
+
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Thu Jan 26 17:40:35 2017
@@ -31,9 +31,13 @@ import org.apache.tez.runtime.library.ou
  * Descriptor for Tez edge. It holds combine plan as well as edge properties.
  */
 public class TezEdgeDescriptor implements Serializable {
-    // Combiner runs on both input and output of Tez edge.
-    transient public PhysicalPlan combinePlan;
+
+    public transient PhysicalPlan combinePlan;
     private boolean needsDistinctCombiner;
+    // Combiner runs on both input and output of Tez edge by default
+    // It can be configured to run only in output(map) or input(reduce)
+    private Boolean combinerInMap;
+    private Boolean combinerInReducer;
 
     public String inputClassName;
     public String outputClassName;
@@ -74,6 +78,22 @@ public class TezEdgeDescriptor implement
         needsDistinctCombiner = nic;
     }
 
+    public Boolean getCombinerInMap() {
+        return combinerInMap;
+    }
+
+    public void setCombinerInMap(Boolean combinerInMap) {
+        this.combinerInMap = combinerInMap;
+    }
+
+    public Boolean getCombinerInReducer() {
+        return combinerInReducer;
+    }
+
+    public void setCombinerInReducer(Boolean combinerInReducer) {
+        this.combinerInReducer = combinerInReducer;
+    }
+
     public boolean isUseSecondaryKey() {
         return useSecondaryKey;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Thu Jan 26 17:40:35 2017
@@ -181,7 +181,11 @@ public class TezOperator extends Operato
         // Indicate if this job is a native job
         NATIVE,
         // Indicate if this job does rank counter
-        RANK_COUNTER;
+        RANK_COUNTER,
+        // Indicate if this job constructs bloom filter
+        BUILDBLOOM,
+        // Indicate if this job applies bloom filter
+        FILTERBLOOM;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -453,6 +457,22 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
     }
 
+    public boolean isBuildBloom() {
+        return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public void markBuildBloom() {
+        feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public boolean isFilterBloom() {
+        return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
+    public void markFilterBloom() {
+        feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Thu Jan 26 17:40:35 2017
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -161,7 +162,7 @@ public class TezPOPackageAnnotator exten
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
-            if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
+            if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
                 return;
             }
             loRearrangeFound++;
@@ -180,7 +181,9 @@ public class TezPOPackageAnnotator exten
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
 
-            Integer index = Integer.valueOf(lrearrange.getIndex());
+            // For BloomPackager there is only one input, but the
+            // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero
+            Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex());
             if(keyInfo.get(index) != null) {
                 if (isPOSplit) {
                     // Case of POSplit having more than one input in case of self join or union
@@ -197,12 +200,20 @@ public class TezPOPackageAnnotator exten
 
             }
 
-            keyInfo.put(index,
-                    new Pair<Boolean, Map<Integer, Integer>>(
-                            lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
-            pkg.getPkgr().setKeyInfo(keyInfo);
-            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
-            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            if (pkg.getPkgr() instanceof BloomPackager ) {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                Boolean.FALSE, new HashMap<Integer, Integer>()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+            } else {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+                pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+                pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            }
+
         }
 
         /**

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1780431&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Thu Jan 26 17:40:35 2017
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class BloomPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean bloomCreatedInMap;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+    private byte bloomKeyType;
+    private boolean isCombiner;
+
+    private transient ByteArrayOutputStream baos;
+    private transient Iterator<Object> distinctKeyIter;
+
+    public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super();
+        this.bloomCreatedInMap = bloomCreatedInMap;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public void setBloomKeyType(byte keyType) {
+        bloomKeyType = keyType;
+    }
+
+    public void setCombiner(boolean isCombiner) {
+        this.isCombiner = isCombiner;
+    }
+
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
+    public Result getNext() throws ExecException {
+        try {
+            if (bloomCreatedInMap) {
+                if (bags == null) {
+                    return new Result(POStatus.STATUS_EOP, null);
+                }
+                // Same function for combiner and reducer
+                return combineBloomFilters();
+            } else {
+                if (isCombiner) {
+                    return getDistinctBloomKeys();
+                } else {
+                    if (bags == null) {
+                        return new Result(POStatus.STATUS_EOP, null);
+                    }
+                    return createBloomFilter();
+                }
+            }
+        } catch (IOException e) {
+            throw new ExecException("Error while constructing final bloom filter", e);
+        }
+    }
+
+    private Result combineBloomFilters() throws IOException {
+        // We get a bag of bloom filters. combine them into one
+        Iterator<Tuple> iter = bags[0].iterator();
+        Tuple tup = iter.next();
+        DataByteArray bloomBytes = (DataByteArray) tup.get(0);
+        BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+        while (iter.hasNext()) {
+            tup = iter.next();
+            bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
+    }
+
+    private Result createBloomFilter() throws IOException {
+        // We get a bag of keys. Create a bloom filter from them
+        // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
+        HashSet<Object> bloomKeys = new HashSet<>();
+        Iterator<Tuple> iter = bags[0].iterator();
+        while (iter.hasNext()) {
+            bloomKeys.add(iter.next().get(0));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+        for (Object bloomKey: bloomKeys) {
+            Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
+            bloomFilter.add(k);
+        }
+        bloomKeys = null;
+        return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
+
+    }
+
+    private Result getSerializedBloomFilter(Object partition,
+            BloomFilter bloomFilter, int serializedSize) throws ExecException,
+            IOException {
+        if (baos == null) {
+            baos = new ByteArrayOutputStream(serializedSize);
+        }
+        baos.reset();
+        DataOutputStream dos = new DataOutputStream(baos);
+        bloomFilter.write(dos);
+        dos.flush();
+
+        Tuple res = mTupleFactory.newTuple(2);
+        res.set(0, partition);
+        res.set(1, new DataByteArray(baos.toByteArray()));
+
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+    }
+
+    private Result getDistinctBloomKeys() throws ExecException {
+        if (distinctKeyIter == null) {
+            HashSet<Object> bloomKeys = new HashSet<>();
+            Iterator<Tuple> iter = bags[0].iterator();
+            while (iter.hasNext()) {
+                bloomKeys.add(iter.next().get(0));
+            }
+            distinctKeyIter = bloomKeys.iterator();
+        }
+        while (distinctKeyIter.hasNext()) {
+            Tuple res = mTupleFactory.newTuple(2);
+            res.set(0, key);
+            res.set(1, distinctKeyIter.next());
+
+            Result r = new Result();
+            r.result = res;
+            r.returnStatus = POStatus.STATUS_OK;
+            return r;
+        }
+        distinctKeyIter = null;
+        return new Result(POStatus.STATUS_EOP, null);
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1780431&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Thu Jan 26 17:40:35 2017
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
+    private String inputKey;
+    private transient KeyValueReader reader;
+    private transient String cacheKey;
+    private int numBloomFilters;
+    private transient BloomFilter[] bloomFilters;
+
+    public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
+        super(lr);
+        this.numBloomFilters = numBloomFilters;
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "bloom-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            bloomFilters = (BloomFilter[]) cacheValue;
+            return;
+        }
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+            while (reader.next()) {
+                if (bloomFilters == null) {
+                    bloomFilters = new BloomFilter[numBloomFilters];
+                }
+                Tuple val = (Tuple) reader.getCurrentValue();
+                int index = (int) val.get(0);
+                bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+            }
+            ObjectCache.getInstance().cache(cacheKey, bloomFilters);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        // If there is no bloom filter, then it means right input was empty
+        // Skip processing
+        if (bloomFilters == null) {
+            return RESULT_EOP;
+        }
+
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        // Skip the record if key is not in the bloom filter
+                        if (!isKeyInBloomFilter(result.get(1))) {
+                            continue;
+                        }
+                        PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private boolean isKeyInBloomFilter(Object key) throws ExecException {
+        if (key == null) {
+            // Null values are dropped in a inner join and in the case of outer join,
+            // POBloomFilterRearrangeTez is only in the plan on the non outer relation.
+            // So just skip them
+            return false;
+        }
+        if (bloomFilters.length == 1) {
+            // Skip computing hashcode
+            Key k = new Key(DataType.toBytes(key, keyType));
+            return bloomFilters[0].membershipTest(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter != null) {
+                Key k = new Key(DataType.toBytes(key, keyType));
+                return filter.membershipTest(k);
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBloomFilterRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BloomFilter Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1780431&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Thu Jan 26 17:40:35 2017
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
+ * In addition, it also writes out the bloom filter constructed from the join keys
+ * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
+ *
+ * Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
+ * It also allows us to have lower false positives with smaller vector sizes.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
+
+    public static final String DEFAULT_BLOOM_STRATEGY = "map";
+    public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
+    public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
+    public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
+    public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
+
+    private String bloomOutputKey;
+    private boolean skipNullKeys = false;
+    private boolean createBloomInMap;
+    private int numBloomFilters;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+
+    private transient BloomFilter[] bloomFilters;
+    private transient KeyValueWriter bloomWriter;
+    private transient PigNullableWritable nullKey;
+    private transient Tuple bloomValue;
+    private transient NullableTuple bloomNullableTuple;
+
+    public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
+            boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super(lr);
+        this.createBloomInMap = createBloomInMap;
+        this.numBloomFilters = numBloomFilters;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public static int getNumBloomFilters(Configuration conf) {
+        if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
+        } else {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
+        }
+    }
+
+    public void setSkipNullKeys(boolean skipNullKeys) {
+        this.skipNullKeys = skipNullKeys;
+    }
+
+    public void setBloomOutputKey(String bloomOutputKey) {
+        this.bloomOutputKey = bloomOutputKey;
+    }
+
+    @Override
+    public boolean containsOutputKey(String key) {
+        if(super.containsOutputKey(key)) {
+            return true;
+        }
+        return bloomOutputKey.equals(key);
+    }
+
+    @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey, bloomOutputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(outputKey)) {
+            outputKey = newOutputKey;
+        } else if (oldOutputKey.equals(bloomOutputKey)) {
+            bloomOutputKey = newOutputKey;
+        }
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        super.attachOutputs(outputs, conf);
+        LogicalOutput output = outputs.get(bloomOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
+        }
+        try {
+            bloomWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+        bloomFilters = new BloomFilter[numBloomFilters];
+        bloomValue = mTupleFactory.newTuple(1);
+        bloomNullableTuple = new NullableTuple(bloomValue);
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        PigNullableWritable key;
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        Object keyObj = result.get(1);
+                        if (keyObj != null) {
+                            key = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            // null keys cannot be part of bloom filter
+                            // Since they are also dropped during join we can skip them
+                            if (createBloomInMap) {
+                                addKeyToBloomFilter(keyObj);
+                            } else {
+                                writeJoinKeyForBloom(keyObj);
+                            }
+                        } else if (skipNullKeys) {
+                            // Inner join. So don't bother writing null key
+                            continue;
+                        } else {
+                            if (nullKey == null) {
+                                nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            }
+                            key = nullKey;
+                        }
+
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    if (this.parentPlan.endOfAllInput && createBloomInMap) {
+                        // In case of Split will get EOP after every record.
+                        // So check for endOfAllInput
+                        writeBloomFilters();
+                    }
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private void addKeyToBloomFilter(Object key) throws ExecException {
+        Key k = new Key(DataType.toBytes(key, keyType));
+        if (bloomFilters.length == 1) {
+            if (bloomFilters[0] == null) {
+                bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+            }
+            bloomFilters[0].add(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter == null) {
+                filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+                bloomFilters[partition] = filter;
+            }
+            filter.add(k);
+        }
+    }
+
+    private void writeJoinKeyForBloom(Object key) throws IOException {
+        int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+        bloomValue.set(0, key);
+        bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+    }
+
+    private void writeBloomFilters() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
+        for (int i = 0; i < bloomFilters.length; i++) {
+            if (bloomFilters[i] != null) {
+                DataOutputStream dos = new DataOutputStream(baos);
+                bloomFilters[i].write(dos);
+                dos.flush();
+                bloomValue.set(0, new DataByteArray(baos.toByteArray()));
+                bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+                baos.reset();
+            }
+        }
+    }
+
+    @Override
+    public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBuildBloomRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BuildBloom Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Thu Jan 26 17:40:35 2017
@@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends
         }
     }
 
-    public String getOutputKey() {
-        return outputKey;
+    public boolean containsOutputKey(String key) {
+        return outputKey.equals(key);
     }
 
     public void setOutputKey(String outputKey) {
@@ -122,6 +122,10 @@ public class POLocalRearrangeTez extends
         }
     }
 
+    protected Result getRearrangedTuple() throws ExecException {
+        return super.getNextTuple();
+    }
+
     @Override
     public Result getNextTuple() throws ExecException {
         res = super.getNextTuple();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Thu Jan 26 17:40:35 2017
@@ -129,7 +129,9 @@ public class POShuffleTezLoad extends PO
                 finished[i] = !readers.get(i).next();
             }
 
-            this.readOnceOneBag = (numInputs == 1) && (pkgr instanceof CombinerPackager || pkgr instanceof LitePackager);
+            this.readOnceOneBag = (numInputs == 1)
+                    && (pkgr instanceof CombinerPackager
+                            || pkgr instanceof LitePackager || pkgr instanceof BloomPackager);
             if (readOnceOneBag) {
                 readOnce[0] = true;
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Thu Jan 26 17:40:35 2017
@@ -69,6 +69,11 @@ public class CombinerOptimizer extends T
         }
 
         for (TezOperator from : predecessors) {
+            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
+            if (!combinePlan.isEmpty()) {
+                // Cases like bloom join have combine plan already set
+                continue;
+            }
             List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
             if (rearranges.isEmpty()) {
                 continue;
@@ -77,7 +82,7 @@ public class CombinerOptimizer extends T
             POLocalRearrangeTez connectingLR = null;
             PhysicalPlan rearrangePlan = from.plan;
             for (POLocalRearrangeTez lr : rearranges) {
-                if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+                if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                     connectingLR = lr;
                     break;
                 }
@@ -90,7 +95,6 @@ public class CombinerOptimizer extends T
 
             // Detected the POLocalRearrange -> POPackage pattern. Let's add
             // combiner if possible.
-            PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
             CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
 
             if(!combinePlan.isEmpty()) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Thu Jan 26 17:40:35 2017
@@ -123,8 +123,8 @@ public class ParallelismSetter extends T
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled
-                            && tezOp.isIntermediateReducer()
                             && !tezOp.isDontEstimateParallelism()
+                            && tezOp.isIntermediateReducer()
                             && tezOp.isOverrideIntermediateParallelism()) {
                         overrideRequestedParallelism = true;
                     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Thu Jan 26 17:40:35 2017
@@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex
         POLocalRearrangeTez connectingLR = null;
         PhysicalPlan rearrangePlan = from.plan;
         for (POLocalRearrangeTez lr : rearranges) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 connectingLR = lr;
                 break;
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Thu Jan 26 17:40:35 2017
@@ -30,6 +30,8 @@ public class TezEstimatedParallelismClea
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
-        tezOp.setEstimatedParallelism(-1);
+        if (!tezOp.isDontEstimateParallelism()) {
+            tezOp.setEstimatedParallelism(-1);
+        }
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Thu Jan 26 17:40:35 2017
@@ -97,16 +97,16 @@ public class TezOperDependencyParallelis
 
         bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
 
+        // If we have already estimated parallelism, use that one
+        if (tezOper.getEstimatedParallelism() != -1) {
+            return tezOper.getEstimatedParallelism();
+        }
+
         // If parallelism is set explicitly, respect it
         if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
             return tezOper.getRequestedParallelism();
         }
 
-        // If we have already estimated parallelism, use that one
-        if (tezOper.getEstimatedParallelism()!=-1) {
-            return tezOper.getEstimatedParallelism();
-        }
-
         List<TezOperator> preds = plan.getPredecessors(tezOper);
         if (preds==null) {
             throw new IOException("Cannot estimate parallelism for source vertex");

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1780431&r1=1780430&r2=1780431&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Thu Jan 26 17:40:35 2017
@@ -38,6 +38,7 @@ public class LOJoin extends LogicalRelat
      */
     public static enum JOINTYPE {
         HASH,    // Hash Join
+        BLOOM,   // Bloom Join
         REPLICATED, // Fragment Replicated join
         SKEWED, // Skewed Join
         MERGE,   // Sort Merge Join