You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/05/23 21:18:10 UTC

svn commit: r1745276 - in /pig/trunk: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ src/org/apache/pig/builtin/ test/org/apache/pig/tez/

Author: rohini
Date: Mon May 23 21:18:10 2016
New Revision: 1745276

URL: http://svn.apache.org/viewvc?rev=1745276&view=rev
Log:
PIG-4885: Turn off union optimizer if there is PARALLEL clause in union in Tez (rohini)

Added:
    pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
    pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1745276&r1=1745275&r2=1745276&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 23 21:18:10 2016
@@ -34,6 +34,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4885: Turn off union optimizer if there is PARALLEL clause in union in Tez (rohini)
+
 PIG-4894: Add API for StoreFunc to specify if they are write safe from two different vertices (rohini)
 
 PIG-4884: Tez needs to use DistinctCombiner.Combine (rohini)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml?rev=1745276&r1=1745275&r2=1745276&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/basic.xml Mon May 23 21:18:10 2016
@@ -8407,7 +8407,7 @@ X = STREAM A THROUGH `stream.pl` as (f1:
    <table>
       <tr> 
             <td>
-               <p>alias = UNION [ONSCHEMA] alias, alias [, alias …];</p>
+               <p>alias = UNION [ONSCHEMA] alias, alias [, alias …] [PARALLEL n];</p>
             </td>
          </tr> 
    </table></section>
@@ -8434,6 +8434,16 @@ X = STREAM A THROUGH `stream.pl` as (f1:
                All inputs to the union must have a non-unknown (non-null) <a href="#schemas">schema</a>.</p>
             </td>
          </tr>
+         
+     <tr>
+        <td>
+           <p>PARALLEL n</p>
+        </td>
+        <td>
+           <p>This is only applicable for Tez execution mode and will not work with Mapreduce mode. Specifying PARALLEL will introduce an extra reduce step that will slightly degrade performance. The primary purpose in this case is to control the number of output files.</p>
+           <p>For more information, see <a href="perf.html#parallel">Use the Parallel Features</a>.</p>
+        </td>
+     </tr>
    </table>
    </section>
    

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1745276&r1=1745275&r2=1745276&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Mon May 23 21:18:10 2016
@@ -957,6 +957,9 @@ You can include the PARALLEL clause with
 <a href="basic.html#join-inner">JOIN (inner)</a>, 
 <a href="basic.html#join-outer">JOIN (outer)</a>, and
 <a href="basic.html#order-by">ORDER BY</a>.
+  PARALLEL clause can also be used with <a href="basic.html#union">UNION</a> if Tez is the execution mode.
+  It will turn off the union optimization and introduce an extra reduce step.
+  Though it will have slightly degraded performance due to the extra step, it is very useful for controlling the number of output files.
 </p>
 
 <p>The number of reducers you need for a particular construct in Pig that forms a MapReduce boundary depends entirely on (1) your data and the number of intermediate keys you are generating in your mappers and (2) the partitioner and distribution of map (combiner) output keys. In the best cases we have seen that a reducer processing about 1 GB of data behaves efficiently.</p>

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1745276&r1=1745275&r2=1745276&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Mon May 23 21:18:10 2016
@@ -109,6 +109,12 @@ public class UnionOptimizer extends TezO
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
             return false;
         }
+
+        // If user has specified a PARALLEL clause with the union operator
+        // turn off union optimization
+        if (tezOp.getRequestedParallelism() != -1) {
+            return false;
+        }
         // Two vertices separately ranking with 1 to n and writing to output directly
         // will make each rank repeate twice which is wrong. Rank always needs to be
         // done from single vertex to have the counting correct.

Modified: pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1745276&r1=1745275&r2=1745276&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java Mon May 23 21:18:10 2016
@@ -17,15 +17,52 @@
  */
 package org.apache.pig.builtin;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
-    private int num = 0;
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
+        implements Configurable {
+
+    /**
+     * Batch size for round robin partitioning. Batch size number of records
+     * will be distributed to each partition in a round robin fashion. Default
+     * value is 0 which distributes each record in a circular fashion. Higher
+     * number for batch size can be used to increase probability of keeping
+     * similar records in the same partition if output is already sorted and get
+     * better compression.
+     */
+    public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size";
+    private int num = -1;
+    private int batchSize = 0;
+    private int currentBatchCount = 0;
+    private Configuration conf;
 
     @Override
     public int getPartition(Writable key, Writable value, int numPartitions) {
-        num = ++num % numPartitions;
+        if (batchSize > 0) {
+            if (currentBatchCount == 0) {
+                num = ++num % numPartitions;
+            }
+            if (++currentBatchCount == batchSize) {
+                currentBatchCount = 0;
+            }
+        } else {
+            num = ++num % numPartitions;
+        }
         return num;
     }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
 }

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1745276&r1=1745275&r2=1745276&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Mon May 23 21:18:10 2016
@@ -644,6 +644,15 @@ public class TestTezCompiler {
         resetScope();
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false);
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true);
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
+                "c = union onschema a, b PARALLEL 15;" +
+                "store c into 'file:///tmp/pigoutput';";
+        // Union optimization should be turned off if PARALLEL clause is specified
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld");
     }
 
     @Test

Added: pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1745276&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java (added)
+++ pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java Mon May 23 21:18:10 2016
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.test.Util;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test class for tez specific behaviour tests
+ */
+public class TestTezJobExecution {
+
+    private static final String TEST_DIR = Util.getTestDirectory(TestTezJobExecution.class);
+
+    private PigServer pigServer;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        Util.deleteDirectory(new File(TEST_DIR));
+        new File(TEST_DIR).mkdirs();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        Util.deleteDirectory(new File(TEST_DIR));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(Util.getLocalTestMode());
+    }
+
+    @Test
+    public void testUnionParallelRoundRobinBatchSize() throws IOException {
+        String input = TEST_DIR + Path.SEPARATOR + "input1";
+        String output = TEST_DIR + Path.SEPARATOR + "output1";
+        Util.createInputFile(pigServer.getPigContext(), input, new String[] {
+            "1", "1", "1", "2", "2", "2"
+        });
+        String query = "A = LOAD '" + input + "';"
+                + "B = LOAD '" + input + "';"
+                + "C = UNION A, B PARALLEL 2;"
+                + "STORE C into '" + output + "';";
+        pigServer.getPigContext().getProperties().setProperty(
+                RoundRobinPartitioner.PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, "3");
+        pigServer.registerQuery(query);
+        String part0 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00000"));
+        String part1 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00001"));
+        assertEquals("1\n1\n1\n1\n1\n1\n", part0);
+        assertEquals("2\n2\n2\n2\n2\n2\n", part1);
+    }
+
+}