You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2014/09/01 07:23:26 UTC

svn commit: r1621675 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ ql/src/test/org/apache/hadoop/hive/ql/exec/

Author: navis
Date: Mon Sep  1 05:23:26 2014
New Revision: 1621675

URL: http://svn.apache.org/r1621675
Log:
HIVE-7669 : parallel order by clause on a string column fails with IOException: Split points are out of order (Navis, reviewed by Szehon Ho and Lefty Leverenz)

Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Sep  1 05:23:26 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.hive.conf.Validator.PatternSet;
 import org.apache.hadoop.hive.conf.Validator.RangeValidator;
+import org.apache.hadoop.hive.conf.Validator.RatioValidator;
 import org.apache.hadoop.hive.conf.Validator.StringSet;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
@@ -1029,9 +1030,11 @@ public class HiveConf extends Configurat
         "When enabled dynamic partitioning column will be globally sorted.\n" +
         "This way we can keep only one record writer open for each partition value\n" +
         "in the reducer thereby reducing the memory pressure on reducers."),
-    HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, ""),
-    HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, ""),
-    HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, ""),
+
+    HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, "Uses sampling on order-by clause for parallel execution."),
+    HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, "Total number of samples to be obtained."),
+    HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, new RatioValidator(),
+        "Probability with which a row will be chosen."),
 
     // whether to optimize union followed by select followed by filesink
     // It creates sub-directories in the final output, so should not be turned on in systems

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/Validator.java Mon Sep  1 05:23:26 2014
@@ -147,7 +147,7 @@ public interface Validator {
     public String validate(String value) {
       try {
         float fvalue = Float.valueOf(value);
-        if (fvalue <= 0 || fvalue >= 1) {
+        if (fvalue < 0 || fvalue > 1) {
           return "Invalid ratio " + value + ", which should be in between 0 to 1";
         }
       } catch (NumberFormatException e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java Mon Sep  1 05:23:26 2014
@@ -20,24 +20,50 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
 
-public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object> {
+public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object>, Configurable {
 
-  private Partitioner<BytesWritable, Object> partitioner
-      = new TotalOrderPartitioner<BytesWritable, Object>();
+  private static final Log LOG = LogFactory.getLog(HiveTotalOrderPartitioner.class);
 
+  private Partitioner<BytesWritable, Object> partitioner;
+
+  @Override
   public void configure(JobConf job) {
-    JobConf newconf = new JobConf(job);
-    newconf.setMapOutputKeyClass(BytesWritable.class);
-    partitioner.configure(newconf);
+    if (partitioner == null) {
+      configurePartitioner(new JobConf(job));
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    // walk-around of TEZ-1403
+    if (partitioner == null) {
+      configurePartitioner(new JobConf(conf));
+    }
   }
 
   public int getPartition(HiveKey key, Object value, int numPartitions) {
     return partitioner.getPartition(key, value, numPartitions);
   }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  private void configurePartitioner(JobConf conf) {
+    LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
+    conf.setMapOutputKeyClass(BytesWritable.class);
+    partitioner = new TotalOrderPartitioner<BytesWritable, Object>();
+    partitioner.configure(conf);
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Mon Sep  1 05:23:26 2014
@@ -27,6 +27,8 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +47,8 @@ import org.apache.hadoop.mapred.OutputCo
 
 public class PartitionKeySampler implements OutputCollector<HiveKey, Object> {
 
+  private static final Log LOG = LogFactory.getLog(PartitionKeySampler.class);
+
   public static final Comparator<byte[]> C = new Comparator<byte[]>() {
     public final int compare(byte[] o1, byte[] o2) {
       return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
@@ -74,32 +78,46 @@ public class PartitionKeySampler impleme
   }
 
   // sort and pick partition keys
-  // copied from org.apache.hadoop.mapred.lib.InputSampler
+  // originally copied from org.apache.hadoop.mapred.lib.InputSampler but seemed to have a bug
   private byte[][] getPartitionKeys(int numReduce) {
     if (sampled.size() < numReduce - 1) {
       throw new IllegalStateException("not enough number of sample");
     }
     byte[][] sorted = sampled.toArray(new byte[sampled.size()][]);
     Arrays.sort(sorted, C);
-    byte[][] partitionKeys = new byte[numReduce - 1][];
-    float stepSize = sorted.length / (float) numReduce;
-    int last = -1;
-    for(int i = 1; i < numReduce; ++i) {
-      int k = Math.round(stepSize * i);
-      while (last >= k && C.compare(sorted[last], sorted[k]) == 0) {
-        k++;
+
+    return toPartitionKeys(sorted, numReduce);
+  }
+
+  static final byte[][] toPartitionKeys(byte[][] sorted, int numPartition) {
+    byte[][] partitionKeys = new byte[numPartition - 1][];
+
+    int last = 0;
+    int current = 0;
+    for(int i = 0; i < numPartition - 1; i++) {
+      current += Math.round((float)(sorted.length - current) / (numPartition - i));
+      while (i > 0 && current < sorted.length && C.compare(sorted[last], sorted[current]) == 0) {
+        current++;
+      }
+      if (current >= sorted.length) {
+        return Arrays.copyOfRange(partitionKeys, 0, i);
       }
-      if (k >= sorted.length) {
-        throw new IllegalStateException("not enough number of sample");
+      if (LOG.isDebugEnabled()) {
+        // print out nth partition key for debugging
+        LOG.debug("Partition key " + current + "th :" + new BytesWritable(sorted[current]));
       }
-      partitionKeys[i - 1] = sorted[k];
-      last = k;
+      partitionKeys[i] = sorted[current];
+      last = current;
     }
     return partitionKeys;
   }
 
-  public void writePartitionKeys(Path path, JobConf job) throws IOException {
+  public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException {
     byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks());
+    int numPartition = partitionKeys.length + 1;
+    if (numPartition != job.getNumReduceTasks()) {
+      job.setNumReduceTasks(numPartition);
+    }
 
     FileSystem fs = path.getFileSystem(job);
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1621675&r1=1621674&r2=1621675&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Mon Sep  1 05:23:26 2014
@@ -371,7 +371,7 @@ public class ExecDriver extends Task<Map
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
 
-      if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
+      if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) {
         try {
           handleSampling(driverContext, mWork, job, conf);
           job.setPartitionerClass(HiveTotalOrderPartitioner.class);
@@ -539,7 +539,7 @@ public class ExecDriver extends Task<Map
     } else {
       throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
     }
-    sampler.writePartitionKeys(partitionFile, job);
+    sampler.writePartitionKeys(partitionFile, conf, job);
   }
 
   /**

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java?rev=1621675&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java Mon Sep  1 05:23:26 2014
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import junit.framework.TestCase;
+
+import java.util.Arrays;
+
+public class TestPartitionKeySampler extends TestCase {
+
+  private static final byte[] _100 = "100".getBytes();
+  private static final byte[] _200 = "200".getBytes();
+  private static final byte[] _300 = "300".getBytes();
+  private static final byte[] _400 = "400".getBytes();
+
+  // current random sampling implementation in InputSampler always returns
+  // value of index 3, 5, 8, which can be same with previous partition key.
+  // That induces "Split points are out of order" exception in TotalOrderPartitioner causing HIVE-7699
+  public void test() throws Throwable {
+    byte[][] sampled;
+    sampled = new byte[][] {
+        _100, _100, _100, _100, _100, _100, _100, _100, _100, _100
+    };
+    assertKeys(sampled, _100); // 3
+
+    sampled = new byte[][] {
+        _100, _100, _100, _100, _100, _100, _100, _100, _200, _200
+    };
+    assertKeys(sampled, _100, _200); // 3, 8
+
+    sampled = new byte[][] {
+        _100, _100, _100, _100 , _200, _200, _200, _300, _300, _300
+    };
+    assertKeys(sampled, _100, _200, _300); // 3, 5, 8
+
+    sampled = new byte[][] {
+        _100, _200, _200, _200, _200, _200, _200, _300, _300, _400
+    };
+    assertKeys(sampled, _200, _300, _400); // 3, 7, 9
+
+    sampled = new byte[][] {
+        _100, _200, _300, _400, _400, _400, _400, _400, _400, _400
+    };
+    assertKeys(sampled, _400);  // 3
+  }
+
+  private void assertKeys(byte[][] sampled, byte[]... expected) {
+    byte[][] keys = PartitionKeySampler.toPartitionKeys(sampled, 4);
+    assertEquals(expected.length, keys.length);
+    for (int i = 0; i < expected.length; i++) {
+      assertTrue(Arrays.equals(expected[i], keys[i]));
+    }
+  }
+}