You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2014/09/01 07:23:26 UTC
svn commit: r1621675 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/mr/
ql/src/test/org/apache/hadoop/hive/ql/exec/
Author: navis
Date: Mon Sep 1 05:23:26 2014
New Revision: 1621675
URL: http://svn.apache.org/r1621675
Log:
HIVE-7669 : parallel order by clause on a string column fails with IOException: Split points are out of order (Navis, reviewed by Szehon Ho and Lefty Leverenz)
Added:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Sep 1 05:23:26 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.hive.conf.Validator.PatternSet;
import org.apache.hadoop.hive.conf.Validator.RangeValidator;
+import org.apache.hadoop.hive.conf.Validator.RatioValidator;
import org.apache.hadoop.hive.conf.Validator.StringSet;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
@@ -1029,9 +1030,11 @@ public class HiveConf extends Configurat
"When enabled dynamic partitioning column will be globally sorted.\n" +
"This way we can keep only one record writer open for each partition value\n" +
"in the reducer thereby reducing the memory pressure on reducers."),
- HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, ""),
- HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, ""),
- HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, ""),
+
+ HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, "Uses sampling on order-by clause for parallel execution."),
+ HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, "Total number of samples to be obtained."),
+ HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, new RatioValidator(),
+ "Probability with which a row will be chosen."),
// whether to optimize union followed by select followed by filesink
// It creates sub-directories in the final output, so should not be turned on in systems
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java Mon Sep 1 05:23:26 2014
@@ -147,7 +147,7 @@ public interface Validator {
public String validate(String value) {
try {
float fvalue = Float.valueOf(value);
- if (fvalue <= 0 || fvalue >= 1) {
+ if (fvalue < 0 || fvalue > 1) {
return "Invalid ratio " + value + ", which should be in between 0 to 1";
}
} catch (NumberFormatException e) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java Mon Sep 1 05:23:26 2014
@@ -20,24 +20,50 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
-public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object> {
+public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object>, Configurable {
- private Partitioner<BytesWritable, Object> partitioner
- = new TotalOrderPartitioner<BytesWritable, Object>();
+ private static final Log LOG = LogFactory.getLog(HiveTotalOrderPartitioner.class);
+ private Partitioner<BytesWritable, Object> partitioner;
+
+ @Override
public void configure(JobConf job) {
- JobConf newconf = new JobConf(job);
- newconf.setMapOutputKeyClass(BytesWritable.class);
- partitioner.configure(newconf);
+ if (partitioner == null) {
+ configurePartitioner(new JobConf(job));
+ }
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ // walk-around of TEZ-1403
+ if (partitioner == null) {
+ configurePartitioner(new JobConf(conf));
+ }
}
public int getPartition(HiveKey key, Object value, int numPartitions) {
return partitioner.getPartition(key, value, numPartitions);
}
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ private void configurePartitioner(JobConf conf) {
+ LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
+ conf.setMapOutputKeyClass(BytesWritable.class);
+ partitioner = new TotalOrderPartitioner<BytesWritable, Object>();
+ partitioner.configure(conf);
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Mon Sep 1 05:23:26 2014
@@ -27,6 +27,8 @@ import java.util.Comparator;
import java.util.List;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +47,8 @@ import org.apache.hadoop.mapred.OutputCo
public class PartitionKeySampler implements OutputCollector<HiveKey, Object> {
+ private static final Log LOG = LogFactory.getLog(PartitionKeySampler.class);
+
public static final Comparator<byte[]> C = new Comparator<byte[]>() {
public final int compare(byte[] o1, byte[] o2) {
return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
@@ -74,32 +78,46 @@ public class PartitionKeySampler impleme
}
// sort and pick partition keys
- // copied from org.apache.hadoop.mapred.lib.InputSampler
+ // originally copied from org.apache.hadoop.mapred.lib.InputSampler but seemed to have a bug
private byte[][] getPartitionKeys(int numReduce) {
if (sampled.size() < numReduce - 1) {
throw new IllegalStateException("not enough number of sample");
}
byte[][] sorted = sampled.toArray(new byte[sampled.size()][]);
Arrays.sort(sorted, C);
- byte[][] partitionKeys = new byte[numReduce - 1][];
- float stepSize = sorted.length / (float) numReduce;
- int last = -1;
- for(int i = 1; i < numReduce; ++i) {
- int k = Math.round(stepSize * i);
- while (last >= k && C.compare(sorted[last], sorted[k]) == 0) {
- k++;
+
+ return toPartitionKeys(sorted, numReduce);
+ }
+
+ static final byte[][] toPartitionKeys(byte[][] sorted, int numPartition) {
+ byte[][] partitionKeys = new byte[numPartition - 1][];
+
+ int last = 0;
+ int current = 0;
+ for(int i = 0; i < numPartition - 1; i++) {
+ current += Math.round((float)(sorted.length - current) / (numPartition - i));
+ while (i > 0 && current < sorted.length && C.compare(sorted[last], sorted[current]) == 0) {
+ current++;
+ }
+ if (current >= sorted.length) {
+ return Arrays.copyOfRange(partitionKeys, 0, i);
}
- if (k >= sorted.length) {
- throw new IllegalStateException("not enough number of sample");
+ if (LOG.isDebugEnabled()) {
+ // print out nth partition key for debugging
+ LOG.debug("Partition key " + current + "th :" + new BytesWritable(sorted[current]));
}
- partitionKeys[i - 1] = sorted[k];
- last = k;
+ partitionKeys[i] = sorted[current];
+ last = current;
}
return partitionKeys;
}
- public void writePartitionKeys(Path path, JobConf job) throws IOException {
+ public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException {
byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks());
+ int numPartition = partitionKeys.length + 1;
+ if (numPartition != job.getNumReduceTasks()) {
+ job.setNumReduceTasks(numPartition);
+ }
FileSystem fs = path.getFileSystem(job);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Mon Sep 1 05:23:26 2014
@@ -371,7 +371,7 @@ public class ExecDriver extends Task<Map
Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
- if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
+ if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) {
try {
handleSampling(driverContext, mWork, job, conf);
job.setPartitionerClass(HiveTotalOrderPartitioner.class);
@@ -539,7 +539,7 @@ public class ExecDriver extends Task<Map
} else {
throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
}
- sampler.writePartitionKeys(partitionFile, job);
+ sampler.writePartitionKeys(partitionFile, conf, job);
}
/**
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java?rev=1621675&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java Mon Sep 1 05:23:26 2014
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import junit.framework.TestCase;
+
+import java.util.Arrays;
+
+public class TestPartitionKeySampler extends TestCase {
+
+ private static final byte[] _100 = "100".getBytes();
+ private static final byte[] _200 = "200".getBytes();
+ private static final byte[] _300 = "300".getBytes();
+ private static final byte[] _400 = "400".getBytes();
+
+ // current random sampling implementation in InputSampler always returns
+ // value of index 3, 5, 8, which can be same with previous partition key.
+ // That induces "Split points are out of order" exception in TotalOrderPartitioner causing HIVE-7699
+ public void test() throws Throwable {
+ byte[][] sampled;
+ sampled = new byte[][] {
+ _100, _100, _100, _100, _100, _100, _100, _100, _100, _100
+ };
+ assertKeys(sampled, _100); // 3
+
+ sampled = new byte[][] {
+ _100, _100, _100, _100, _100, _100, _100, _100, _200, _200
+ };
+ assertKeys(sampled, _100, _200); // 3, 8
+
+ sampled = new byte[][] {
+ _100, _100, _100, _100 , _200, _200, _200, _300, _300, _300
+ };
+ assertKeys(sampled, _100, _200, _300); // 3, 5, 8
+
+ sampled = new byte[][] {
+ _100, _200, _200, _200, _200, _200, _200, _300, _300, _400
+ };
+ assertKeys(sampled, _200, _300, _400); // 3, 7, 9
+
+ sampled = new byte[][] {
+ _100, _200, _300, _400, _400, _400, _400, _400, _400, _400
+ };
+ assertKeys(sampled, _400); // 3
+ }
+
+ private void assertKeys(byte[][] sampled, byte[]... expected) {
+ byte[][] keys = PartitionKeySampler.toPartitionKeys(sampled, 4);
+ assertEquals(expected.length, keys.length);
+ for (int i = 0; i < expected.length; i++) {
+ assertTrue(Arrays.equals(expected[i], keys[i]));
+ }
+ }
+}