You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2020/05/18 17:19:34 UTC
[hive] branch master updated: HIVE-23178: Add Tez Total Order
Partitioner (addendum, whitespace fixes)
This is an automated email from the ASF dual-hosted git repository.
gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ece43ea HIVE-23178: Add Tez Total Order Partitioner (addendum, whitespace fixes)
ece43ea is described below
commit ece43eaa369d7224c47add8e2c42891c73408cc4
Author: Gopal V <go...@apache.org>
AuthorDate: Mon May 18 10:19:09 2020 -0700
HIVE-23178: Add Tez Total Order Partitioner (addendum, whitespace fixes)
Signed-off-by: Gopal V <go...@apache.org>
---
.../hive/ql/exec/tez/TezTotalOrderPartitioner.java | 82 +++---
.../ql/exec/tez/TestTezTotalOrderPartitioner.java | 315 +++++++++++----------
2 files changed, 201 insertions(+), 196 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java
index c66d964..4596c02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java
@@ -35,52 +35,52 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
public class TezTotalOrderPartitioner implements Partitioner<HiveKey, Object>, Configurable {
- private static final Logger LOG = LoggerFactory.getLogger(TezTotalOrderPartitioner.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TezTotalOrderPartitioner.class);
- private Partitioner<HiveKey, Object> partitioner;
+ private Partitioner<HiveKey, Object> partitioner;
- private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
- public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = TEZ_RUNTIME_FRAMEWORK_PREFIX
- + "num.expected.partitions";
+ private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
+ public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS =
+ TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions";
- @Override
- public void configure(JobConf job) {
- if (partitioner == null) {
- configurePartitioner(new JobConf(job));
- }
- }
+ @Override
+ public void configure(JobConf job) {
+ if (partitioner == null) {
+ configurePartitioner(new JobConf(job));
+ }
+ }
- @Override
- public void setConf(Configuration conf) {
- // walk-around of TEZ-1403
- if (partitioner == null) {
- configurePartitioner(new JobConf(conf));
- }
- }
+ @Override
+ public void setConf(Configuration conf) {
+ // walk-around of TEZ-1403
+ if (partitioner == null) {
+ configurePartitioner(new JobConf(conf));
+ }
+ }
- public int getPartition(HiveKey key, Object value, int numPartitions) {
- return partitioner.getPartition(key, value, numPartitions);
- }
+ public int getPartition(HiveKey key, Object value, int numPartitions) {
+ return partitioner.getPartition(key, value, numPartitions);
+ }
- @Override
- public Configuration getConf() {
- return null;
- }
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
- private void configurePartitioner(JobConf conf) {
- LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
- // make the HiveKey assumption
- conf.setMapOutputKeyClass(HiveKey.class);
- LOG.info(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
- // remove the Tez fast serialization factory (TEZ-1288)
- // this one skips the len prefix, so that the sorter can assume byte-order ==
- // sort-order
- conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName(),
- WritableSerialization.class.getName());
- int partitions = conf.getInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, -1);
- // get the tez partitioning and feed it into the MR config
- conf.setInt(MRJobConfig.NUM_REDUCES, partitions);
- partitioner = new TotalOrderPartitioner<HiveKey, Object>();
- partitioner.configure(conf);
- }
+ private void configurePartitioner(JobConf conf) {
+ LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
+ // make the HiveKey assumption
+ conf.setMapOutputKeyClass(HiveKey.class);
+ LOG.info(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+ // remove the Tez fast serialization factory (TEZ-1288)
+ // this one skips the len prefix, so that the sorter can assume byte-order ==
+ // sort-order
+ conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+ JavaSerialization.class.getName(), WritableSerialization.class.getName());
+ int partitions = conf.getInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, -1);
+ // get the tez partitioning and feed it into the MR config
+ conf.setInt(MRJobConfig.NUM_REDUCES, partitions);
+ partitioner = new TotalOrderPartitioner<HiveKey, Object>();
+ partitioner.configure(conf);
+ }
}
\ No newline at end of file
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java
index 363487e..cc16fd4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java
@@ -48,159 +48,164 @@ import static org.junit.Assert.assertEquals;
public class TestTezTotalOrderPartitioner {
- public static final String PARTITIONER_PATH = "mapreduce.totalorderpartitioner.path";
- private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
-
- public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions";
-
- private static final int LENGTH_BYTES = 4;
-
- private static final HiveKey[] splitStrings = new HiveKey[] {
- // -inf // 0
- new HiveKey("aabbb".getBytes()), // 1
- new HiveKey("babbb".getBytes()), // 2
- new HiveKey("daddd".getBytes()), // 3
- new HiveKey("dddee".getBytes()), // 4
- new HiveKey("ddhee".getBytes()), // 5
- new HiveKey("dingo".getBytes()), // 6
- new HiveKey("hijjj".getBytes()), // 7
- new HiveKey("n".getBytes()), // 8
- new HiveKey("yak".getBytes()), // 9
- };
-
- static class Check<T> {
- T data;
- int part;
-
- Check(T data, int part) {
- this.data = data;
- this.part = part;
- }
- }
-
- private static final ArrayList<Check<HiveKey>> testStrings = new ArrayList<Check<HiveKey>>();
- static {
- testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
- testStrings.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 0));
- testStrings.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 1));
- testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
- testStrings.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 2));
- testStrings.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 1));
- testStrings.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 8));
- testStrings.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 9));
- testStrings.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 9));
- testStrings.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 5));
- testStrings.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 6));
- };
-
- private static <T> Path writePartitionFile(String testname, Configuration conf, T[] splits) throws IOException {
- final FileSystem fs = FileSystem.getLocal(conf);
- final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(fs.getUri(),
- fs.getWorkingDirectory());
- Path p = new Path(testdir, testname + "/_partition.lst");
- conf.set(PARTITIONER_PATH, p.toString());
- conf.setInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, splits.length + 1);
- SequenceFile.Writer w = null;
- try {
- w = SequenceFile.createWriter(conf, SequenceFile.Writer.file(p),
- SequenceFile.Writer.keyClass(HiveKey.class), SequenceFile.Writer.valueClass(NullWritable.class),
- SequenceFile.Writer.compression(CompressionType.NONE));
- for (int i = 0; i < splits.length; ++i) {
- w.append(splits[i], NullWritable.get());
- }
- } finally {
- if (null != w)
- w.close();
- }
- return p;
- }
-
- @Test
- public void testTotalOrderMemCmp() throws Exception {
- TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
- Configuration conf = new Configuration();
- Path p = TestTezTotalOrderPartitioner.<HiveKey>writePartitionFile("totalordermemcmp", conf, splitStrings);
- try {
- partitioner.configure(new JobConf(conf));
- NullWritable nw = NullWritable.get();
- for (Check<HiveKey> chk : testStrings) {
- assertEquals(chk.data.toString(), chk.part,
- partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
- }
- } finally {
- p.getFileSystem(conf).delete(p, true);
- }
- }
-
- @Test
- public void testTotalOrderBinarySearch() throws Exception {
- TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
- Configuration conf = new Configuration();
- Path p = TestTezTotalOrderPartitioner.<HiveKey>writePartitionFile("totalorderbinarysearch", conf, splitStrings);
- conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
-
- try {
- partitioner.configure(new JobConf(conf));
- NullWritable nw = NullWritable.get();
- for (Check<HiveKey> chk : testStrings) {
- assertEquals(chk.data.toString(), chk.part,
- partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
- }
- } finally {
- p.getFileSystem(conf).delete(p, true);
- }
- }
-
-
- /** A Comparator optimized for HiveKey. */
- public static class ReverseHiveKeyComparator implements RawComparator<HiveKey> {
-
- /**
- * Compare the buffers in serialized form.
- */
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- int c = -1*WritableComparator.compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
- return c;
- }
-
- @Override
- public int compare(HiveKey o1, HiveKey o2) {
- return -o1.compareTo(o2);
- }
- }
-
- @Test
- public void testTotalOrderCustomComparator() throws Exception {
- TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
- Configuration conf = new Configuration();
- HiveKey[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
- Arrays.sort(revSplitStrings, new ReverseHiveKeyComparator());
- Path p = TestTezTotalOrderPartitioner.<HiveKey>writePartitionFile("totalordercustomcomparator", conf,
- revSplitStrings);
- conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
- conf.setClass(MRJobConfig.KEY_COMPARATOR, ReverseHiveKeyComparator.class, RawComparator.class);
- ArrayList<Check<HiveKey>> revCheck = new ArrayList<Check<HiveKey>>();
- revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
- revCheck.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 9));
- revCheck.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 9));
- revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
- revCheck.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 8));
- revCheck.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 8));
- revCheck.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 1));
- revCheck.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 1));
- revCheck.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 0));
- revCheck.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 4));
- revCheck.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 3));
- try {
- partitioner.configure(new JobConf(conf));
- NullWritable nw = NullWritable.get();
- for (Check<HiveKey> chk : revCheck) {
- assertEquals(chk.data.toString(), chk.part,
- partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
- }
- } finally {
- p.getFileSystem(conf).delete(p, true);
- }
- }
+ public static final String PARTITIONER_PATH = "mapreduce.totalorderpartitioner.path";
+ private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
+
+ public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS =
+ TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions";
+
+ private static final int LENGTH_BYTES = 4;
+
+ private static final HiveKey[] splitStrings = new HiveKey[] {
+ // -inf // 0
+ new HiveKey("aabbb".getBytes()), // 1
+ new HiveKey("babbb".getBytes()), // 2
+ new HiveKey("daddd".getBytes()), // 3
+ new HiveKey("dddee".getBytes()), // 4
+ new HiveKey("ddhee".getBytes()), // 5
+ new HiveKey("dingo".getBytes()), // 6
+ new HiveKey("hijjj".getBytes()), // 7
+ new HiveKey("n".getBytes()), // 8
+ new HiveKey("yak".getBytes()), // 9
+ };
+
+ static class Check<T> {
+ T data;
+ int part;
+
+ Check(T data, int part) {
+ this.data = data;
+ this.part = part;
+ }
+ }
+
+ private static final ArrayList<Check<HiveKey>> testStrings = new ArrayList<Check<HiveKey>>();
+ static {
+ testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
+ testStrings.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 0));
+ testStrings.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 1));
+ testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
+ testStrings.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 2));
+ testStrings.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 1));
+ testStrings.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 8));
+ testStrings.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 9));
+ testStrings.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 9));
+ testStrings.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 5));
+ testStrings.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 6));
+ };
+
+ private static <T> Path writePartitionFile(String testname, Configuration conf, T[] splits)
+ throws IOException {
+ final FileSystem fs = FileSystem.getLocal(conf);
+ final Path testdir = new Path(System.getProperty("test.build.data", "/tmp"))
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ Path p = new Path(testdir, testname + "/_partition.lst");
+ conf.set(PARTITIONER_PATH, p.toString());
+ conf.setInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, splits.length + 1);
+ SequenceFile.Writer w = null;
+ try {
+ w = SequenceFile.createWriter(conf, SequenceFile.Writer.file(p),
+ SequenceFile.Writer.keyClass(HiveKey.class),
+ SequenceFile.Writer.valueClass(NullWritable.class),
+ SequenceFile.Writer.compression(CompressionType.NONE));
+ for (int i = 0; i < splits.length; ++i) {
+ w.append(splits[i], NullWritable.get());
+ }
+ } finally {
+ if (null != w)
+ w.close();
+ }
+ return p;
+ }
+
+ @Test
+ public void testTotalOrderMemCmp() throws Exception {
+ TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
+ Configuration conf = new Configuration();
+ Path p = TestTezTotalOrderPartitioner.<HiveKey> writePartitionFile("totalordermemcmp", conf,
+ splitStrings);
+ try {
+ partitioner.configure(new JobConf(conf));
+ NullWritable nw = NullWritable.get();
+ for (Check<HiveKey> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+
+ @Test
+ public void testTotalOrderBinarySearch() throws Exception {
+ TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
+ Configuration conf = new Configuration();
+ Path p = TestTezTotalOrderPartitioner.<HiveKey> writePartitionFile("totalorderbinarysearch",
+ conf, splitStrings);
+ conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+
+ try {
+ partitioner.configure(new JobConf(conf));
+ NullWritable nw = NullWritable.get();
+ for (Check<HiveKey> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+
+ /** A Comparator optimized for HiveKey. */
+ public static class ReverseHiveKeyComparator implements RawComparator<HiveKey> {
+
+ /**
+ * Compare the buffers in serialized form.
+ */
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int c = -1 * WritableComparator.compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2,
+ s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+ return c;
+ }
+
+ @Override
+ public int compare(HiveKey o1, HiveKey o2) {
+ return -o1.compareTo(o2);
+ }
+ }
+
+ @Test
+ public void testTotalOrderCustomComparator() throws Exception {
+ TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
+ Configuration conf = new Configuration();
+ HiveKey[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
+ Arrays.sort(revSplitStrings, new ReverseHiveKeyComparator());
+ Path p = TestTezTotalOrderPartitioner.<HiveKey> writePartitionFile("totalordercustomcomparator",
+ conf, revSplitStrings);
+ conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+ conf.setClass(MRJobConfig.KEY_COMPARATOR, ReverseHiveKeyComparator.class, RawComparator.class);
+ ArrayList<Check<HiveKey>> revCheck = new ArrayList<Check<HiveKey>>();
+ revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
+ revCheck.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 9));
+ revCheck.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 9));
+ revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
+ revCheck.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 8));
+ revCheck.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 8));
+ revCheck.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 1));
+ revCheck.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 1));
+ revCheck.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 0));
+ revCheck.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 4));
+ revCheck.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 3));
+ try {
+ partitioner.configure(new JobConf(conf));
+ NullWritable nw = NullWritable.get();
+ for (Check<HiveKey> chk : revCheck) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
}
\ No newline at end of file