You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/10/17 15:22:57 UTC
svn commit: r1765312 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/
src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/
src/org/apache/pig/backend/hadoop/executionengine/tez/util/
src/org/apache/pig...
Author: rohini
Date: Mon Oct 17 15:22:57 2016
New Revision: 1765312
URL: http://svn.apache.org/viewvc?rev=1765312&view=rev
Log:
PIG-5041: RoundRobinPartitioner is not deterministic when order of input records change (rohini)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java
pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 17 15:22:57 2016
@@ -48,6 +48,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5041: RoundRobinPartitioner is not deterministic when order of input records change (rohini)
+
PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini
PIG-5038: Pig Limit_2 e2e test failed with sort check (Konstantin_Harasov via rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Mon Oct 17 15:22:57 2016
@@ -45,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -53,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag
import org.apache.pig.builtin.JsonStorage;
import org.apache.pig.builtin.OrcStorage;
import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -590,7 +590,7 @@ public class UnionOptimizer extends TezO
// more union predecessors. Change it to SCATTER_GATHER
if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
edge.dataMovementType = DataMovementType.SCATTER_GATHER;
- edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.partitionerClass = HashValuePartitioner.class;
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1765312&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java Mon Oct 17 15:22:57 2016
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+
+public class HashValuePartitioner extends Partitioner<Writable, Writable> {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public int getPartition(Writable key, Writable value, int numPartitions) {
+ int hash = 17;
+ Tuple tuple;
+ if (value instanceof Tuple) {
+ // union optimizer turned off
+ tuple = (Tuple) value;
+ } else {
+ // union followed by order by or skewed join
+ tuple = (Tuple)((NullableTuple) value).getValueAsPigType();
+ }
+ if (tuple != null) {
+ for (Object o : tuple.getAll()) {
+ if (o != null) {
+ // Skip computing hashcode for bags.
+ // Order of elements in the map/bag may be different on each run
+ if (o instanceof DataBag) {
+ hash = 31 * hash;
+ } else if (o instanceof Map) {
+ // Including size of map as it is easily available
+ // Not doing for DataBag as some implementations actually
+ // iterate through all elements in the bag to get the size.
+ hash = 31 * hash + ((Map) o).size();
+ } else {
+ hash = 31 * hash + o.hashCode();
+ }
+ }
+ }
+ }
+ return (hash & Integer.MAX_VALUE) % numPartitions;
+ }
+
+}
\ No newline at end of file
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Mon Oct 17 15:22:57 2016
@@ -40,9 +40,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.TOBAG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
@@ -269,7 +269,7 @@ public class TezCompilerUtil {
} else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
- edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.partitionerClass = HashValuePartitioner.class;
}
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
Modified: pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/RoundRobinPartitioner.java Mon Oct 17 15:22:57 2016
@@ -22,6 +22,17 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
+/**
+ * This partitioner should be used with extreme caution and only in cases
+ * where the order of output records is guaranteed to be same. If the order of
+ * output records can vary on retries which is mostly the case, map reruns
+ * due to shuffle fetch failures can lead to data being partitioned differently
+ * and result in incorrect output due to loss or duplication of data.
+ * Refer PIG-5041 for more details.
+ *
+ * This will be removed in the next release as it is risky to use in most cases.
+ */
+@Deprecated
public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
implements Configurable {
Modified: pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1765312&r1=1765311&r2=1765312&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezJobExecution.java Mon Oct 17 15:22:57 2016
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigRunner;
import org.apache.pig.PigServer;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.test.Util;
import org.apache.pig.tools.pigstats.JobStats;
@@ -72,19 +71,17 @@ public class TestTezJobExecution {
}
@Test
- public void testUnionParallelRoundRobinBatchSize() throws IOException {
+ public void testUnionParallelHashValuePartition() throws IOException {
String output = TEST_DIR + Path.SEPARATOR + "output1";
String query = "A = LOAD '" + INPUT_FILE + "';"
+ "B = LOAD '" + INPUT_FILE + "';"
+ "C = UNION A, B PARALLEL 2;"
+ "STORE C into '" + output + "';";
- pigServer.getPigContext().getProperties().setProperty(
- RoundRobinPartitioner.PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, "3");
pigServer.registerQuery(query);
String part0 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00000"));
String part1 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00001"));
- assertEquals("1\n1\n1\n1\n1\n1\n", part0);
- assertEquals("2\n2\n2\n2\n2\n2\n", part1);
+ assertEquals("2\n2\n2\n2\n2\n2\n", part0);
+ assertEquals("1\n1\n1\n1\n1\n1\n", part1);
}
@Test
@@ -108,7 +105,7 @@ public class TestTezJobExecution {
// Recovery is not disabled when there is auto parallelism. Should reuse AM application session
PigStats stats = PigRunner.run(args, listener);
assertTrue(stats.isSuccessful());
- assertEquals(listener.getJobsStarted().size(), 1);
+ assertEquals(1, listener.getJobsStarted().size());
Util.deleteFile(pigServer.getPigContext(), output1);
Util.deleteFile(pigServer.getPigContext(), output2);
@@ -122,7 +119,7 @@ public class TestTezJobExecution {
scriptFile };
stats = PigRunner.run(args, listener);
assertTrue(stats.isSuccessful());
- assertEquals(listener.getJobsStarted().size(), 2);
+ assertEquals(2, listener.getJobsStarted().size());
}