You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2020/05/18 17:19:34 UTC

[hive] branch master updated: HIVE-23178: Add Tez Total Order Partitioner (addendum, whitespace fixes)

This is an automated email from the ASF dual-hosted git repository.

gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ece43ea  HIVE-23178: Add Tez Total Order Partitioner (addendum, whitespace fixes)
ece43ea is described below

commit ece43eaa369d7224c47add8e2c42891c73408cc4
Author: Gopal V <go...@apache.org>
AuthorDate: Mon May 18 10:19:09 2020 -0700

    HIVE-23178: Add Tez Total Order Partitioner (addendum, whitespace fixes)
    
    Signed-off-by: Gopal V <go...@apache.org>
---
 .../hive/ql/exec/tez/TezTotalOrderPartitioner.java |  82 +++---
 .../ql/exec/tez/TestTezTotalOrderPartitioner.java  | 315 +++++++++++----------
 2 files changed, 201 insertions(+), 196 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java
index c66d964..4596c02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java
@@ -35,52 +35,52 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 
 public class TezTotalOrderPartitioner implements Partitioner<HiveKey, Object>, Configurable {
 
-	private static final Logger LOG = LoggerFactory.getLogger(TezTotalOrderPartitioner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TezTotalOrderPartitioner.class);
 
-	private Partitioner<HiveKey, Object> partitioner;
+  private Partitioner<HiveKey, Object> partitioner;
 
-	private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
-	public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = TEZ_RUNTIME_FRAMEWORK_PREFIX
-			+ "num.expected.partitions";
+  private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
+  public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS =
+      TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions";
 
-	@Override
-	public void configure(JobConf job) {
-		if (partitioner == null) {
-			configurePartitioner(new JobConf(job));
-		}
-	}
+  @Override
+  public void configure(JobConf job) {
+    if (partitioner == null) {
+      configurePartitioner(new JobConf(job));
+    }
+  }
 
-	@Override
-	public void setConf(Configuration conf) {
-		// walk-around of TEZ-1403
-		if (partitioner == null) {
-			configurePartitioner(new JobConf(conf));
-		}
-	}
+  @Override
+  public void setConf(Configuration conf) {
+    // walk-around of TEZ-1403
+    if (partitioner == null) {
+      configurePartitioner(new JobConf(conf));
+    }
+  }
 
-	public int getPartition(HiveKey key, Object value, int numPartitions) {
-		return partitioner.getPartition(key, value, numPartitions);
-	}
+  public int getPartition(HiveKey key, Object value, int numPartitions) {
+    return partitioner.getPartition(key, value, numPartitions);
+  }
 
-	@Override
-	public Configuration getConf() {
-		return null;
-	}
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
 
-	private void configurePartitioner(JobConf conf) {
-		LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
-		// make the HiveKey assumption
-		conf.setMapOutputKeyClass(HiveKey.class);
-		LOG.info(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-		// remove the Tez fast serialization factory (TEZ-1288)
-		// this one skips the len prefix, so that the sorter can assume byte-order ==
-		// sort-order
-		conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName(),
-				WritableSerialization.class.getName());
-		int partitions = conf.getInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, -1);
-		// get the tez partitioning and feed it into the MR config
-		conf.setInt(MRJobConfig.NUM_REDUCES, partitions);
-		partitioner = new TotalOrderPartitioner<HiveKey, Object>();
-		partitioner.configure(conf);
-	}
+  private void configurePartitioner(JobConf conf) {
+    LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
+    // make the HiveKey assumption
+    conf.setMapOutputKeyClass(HiveKey.class);
+    LOG.info(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+    // remove the Tez fast serialization factory (TEZ-1288)
+    // this one skips the len prefix, so that the sorter can assume byte-order ==
+    // sort-order
+    conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+        JavaSerialization.class.getName(), WritableSerialization.class.getName());
+    int partitions = conf.getInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, -1);
+    // get the tez partitioning and feed it into the MR config
+    conf.setInt(MRJobConfig.NUM_REDUCES, partitions);
+    partitioner = new TotalOrderPartitioner<HiveKey, Object>();
+    partitioner.configure(conf);
+  }
 }
\ No newline at end of file
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java
index 363487e..cc16fd4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java
@@ -48,159 +48,164 @@ import static org.junit.Assert.assertEquals;
 
 public class TestTezTotalOrderPartitioner {
 
-	public static final String PARTITIONER_PATH = "mapreduce.totalorderpartitioner.path";
-	private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
-	
-	public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions";
-
-	private static final int LENGTH_BYTES = 4;
-
-	private static final HiveKey[] splitStrings = new HiveKey[] {
-			// -inf // 0
-			new HiveKey("aabbb".getBytes()), // 1
-			new HiveKey("babbb".getBytes()), // 2
-			new HiveKey("daddd".getBytes()), // 3
-			new HiveKey("dddee".getBytes()), // 4
-			new HiveKey("ddhee".getBytes()), // 5
-			new HiveKey("dingo".getBytes()), // 6
-			new HiveKey("hijjj".getBytes()), // 7
-			new HiveKey("n".getBytes()), // 8
-			new HiveKey("yak".getBytes()), // 9
-	};
-
-	static class Check<T> {
-		T data;
-		int part;
-
-		Check(T data, int part) {
-			this.data = data;
-			this.part = part;
-		}
-	}
-
-	private static final ArrayList<Check<HiveKey>> testStrings = new ArrayList<Check<HiveKey>>();
-	static {
-		testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
-		testStrings.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 0));
-		testStrings.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 1));
-		testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
-		testStrings.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 2));
-		testStrings.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 1));
-		testStrings.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 8));
-		testStrings.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 9));
-		testStrings.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 9));
-		testStrings.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 5));
-		testStrings.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 6));
-	};
-
-	private static <T> Path writePartitionFile(String testname, Configuration conf, T[] splits) throws IOException {
-		final FileSystem fs = FileSystem.getLocal(conf);
-		final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(fs.getUri(),
-				fs.getWorkingDirectory());
-		Path p = new Path(testdir, testname + "/_partition.lst");
-		conf.set(PARTITIONER_PATH, p.toString());
-		conf.setInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, splits.length + 1);
-		SequenceFile.Writer w = null;
-		try {
-			w = SequenceFile.createWriter(conf, SequenceFile.Writer.file(p),
-					SequenceFile.Writer.keyClass(HiveKey.class), SequenceFile.Writer.valueClass(NullWritable.class),
-					SequenceFile.Writer.compression(CompressionType.NONE));
-			for (int i = 0; i < splits.length; ++i) {
-				w.append(splits[i], NullWritable.get());
-			}
-		} finally {
-			if (null != w)
-				w.close();
-		}
-		return p;
-	}
-
-	@Test
-	public void testTotalOrderMemCmp() throws Exception {
-		TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
-		Configuration conf = new Configuration();
-		Path p = TestTezTotalOrderPartitioner.<HiveKey>writePartitionFile("totalordermemcmp", conf, splitStrings);
-		try {
-			partitioner.configure(new JobConf(conf));
-			NullWritable nw = NullWritable.get();
-			for (Check<HiveKey> chk : testStrings) {
-				assertEquals(chk.data.toString(), chk.part,
-						partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
-			}
-		} finally {
-			p.getFileSystem(conf).delete(p, true);
-		}
-	}
-
-	@Test
-	public void testTotalOrderBinarySearch() throws Exception {
-		TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
-		Configuration conf = new Configuration();
-		Path p = TestTezTotalOrderPartitioner.<HiveKey>writePartitionFile("totalorderbinarysearch", conf, splitStrings);
-		conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
-		
-		try {
-			partitioner.configure(new JobConf(conf));
-			NullWritable nw = NullWritable.get();
-			for (Check<HiveKey> chk : testStrings) {
-				assertEquals(chk.data.toString(), chk.part,
-						partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
-			}
-		} finally {
-			p.getFileSystem(conf).delete(p, true);
-		}
-	}
-
-
-	/** A Comparator optimized for HiveKey. */
-	public static class ReverseHiveKeyComparator implements RawComparator<HiveKey> {
-		
-		/**
-		 * Compare the buffers in serialized form.
-		 */
-		@Override
-		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-			int c = -1*WritableComparator.compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
-			return c;
-		}
-
-		@Override
-		public int compare(HiveKey o1, HiveKey o2) {
-			return -o1.compareTo(o2);
-		}
-	}
-
-	@Test
-	public void testTotalOrderCustomComparator() throws Exception {
-		TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
-		Configuration conf = new Configuration();
-		HiveKey[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
-		Arrays.sort(revSplitStrings, new ReverseHiveKeyComparator());
-		Path p = TestTezTotalOrderPartitioner.<HiveKey>writePartitionFile("totalordercustomcomparator", conf,
-				revSplitStrings);
-		conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
-		conf.setClass(MRJobConfig.KEY_COMPARATOR, ReverseHiveKeyComparator.class, RawComparator.class);
-		ArrayList<Check<HiveKey>> revCheck = new ArrayList<Check<HiveKey>>();
-		revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
-		revCheck.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 9));
-		revCheck.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 9));
-		revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
-		revCheck.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 8));
-		revCheck.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 8));
-		revCheck.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 1));
-		revCheck.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 1));
-		revCheck.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 0));
-		revCheck.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 4));
-		revCheck.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 3));
-		try {
-			partitioner.configure(new JobConf(conf));
-			NullWritable nw = NullWritable.get();
-			for (Check<HiveKey> chk : revCheck) {
-				assertEquals(chk.data.toString(), chk.part,
-						partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
-			}
-		} finally {
-			p.getFileSystem(conf).delete(p, true);
-		}
-	}
+  public static final String PARTITIONER_PATH = "mapreduce.totalorderpartitioner.path";
+  private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
+
+  public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS =
+      TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions";
+
+  private static final int LENGTH_BYTES = 4;
+
+  private static final HiveKey[] splitStrings = new HiveKey[] {
+      // -inf // 0
+      new HiveKey("aabbb".getBytes()), // 1
+      new HiveKey("babbb".getBytes()), // 2
+      new HiveKey("daddd".getBytes()), // 3
+      new HiveKey("dddee".getBytes()), // 4
+      new HiveKey("ddhee".getBytes()), // 5
+      new HiveKey("dingo".getBytes()), // 6
+      new HiveKey("hijjj".getBytes()), // 7
+      new HiveKey("n".getBytes()), // 8
+      new HiveKey("yak".getBytes()), // 9
+  };
+
+  static class Check<T> {
+    T data;
+    int part;
+
+    Check(T data, int part) {
+      this.data = data;
+      this.part = part;
+    }
+  }
+
+  private static final ArrayList<Check<HiveKey>> testStrings = new ArrayList<Check<HiveKey>>();
+  static {
+    testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
+    testStrings.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 0));
+    testStrings.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 1));
+    testStrings.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 0));
+    testStrings.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 2));
+    testStrings.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 1));
+    testStrings.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 8));
+    testStrings.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 9));
+    testStrings.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 9));
+    testStrings.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 5));
+    testStrings.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 6));
+  };
+
+  private static <T> Path writePartitionFile(String testname, Configuration conf, T[] splits)
+      throws IOException {
+    final FileSystem fs = FileSystem.getLocal(conf);
+    final Path testdir = new Path(System.getProperty("test.build.data", "/tmp"))
+        .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path p = new Path(testdir, testname + "/_partition.lst");
+    conf.set(PARTITIONER_PATH, p.toString());
+    conf.setInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, splits.length + 1);
+    SequenceFile.Writer w = null;
+    try {
+      w = SequenceFile.createWriter(conf, SequenceFile.Writer.file(p),
+          SequenceFile.Writer.keyClass(HiveKey.class),
+          SequenceFile.Writer.valueClass(NullWritable.class),
+          SequenceFile.Writer.compression(CompressionType.NONE));
+      for (int i = 0; i < splits.length; ++i) {
+        w.append(splits[i], NullWritable.get());
+      }
+    } finally {
+      if (null != w)
+        w.close();
+    }
+    return p;
+  }
+
+  @Test
+  public void testTotalOrderMemCmp() throws Exception {
+    TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
+    Configuration conf = new Configuration();
+    Path p = TestTezTotalOrderPartitioner.<HiveKey> writePartitionFile("totalordermemcmp", conf,
+        splitStrings);
+    try {
+      partitioner.configure(new JobConf(conf));
+      NullWritable nw = NullWritable.get();
+      for (Check<HiveKey> chk : testStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
+
+  @Test
+  public void testTotalOrderBinarySearch() throws Exception {
+    TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
+    Configuration conf = new Configuration();
+    Path p = TestTezTotalOrderPartitioner.<HiveKey> writePartitionFile("totalorderbinarysearch",
+        conf, splitStrings);
+    conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+
+    try {
+      partitioner.configure(new JobConf(conf));
+      NullWritable nw = NullWritable.get();
+      for (Check<HiveKey> chk : testStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
+
+  /** A Comparator optimized for HiveKey. */
+  public static class ReverseHiveKeyComparator implements RawComparator<HiveKey> {
+
+    /**
+     * Compare the buffers in serialized form.
+     */
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int c = -1 * WritableComparator.compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2,
+          s2 + LENGTH_BYTES, l2 - LENGTH_BYTES);
+      return c;
+    }
+
+    @Override
+    public int compare(HiveKey o1, HiveKey o2) {
+      return -o1.compareTo(o2);
+    }
+  }
+
+  @Test
+  public void testTotalOrderCustomComparator() throws Exception {
+    TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner();
+    Configuration conf = new Configuration();
+    HiveKey[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
+    Arrays.sort(revSplitStrings, new ReverseHiveKeyComparator());
+    Path p = TestTezTotalOrderPartitioner.<HiveKey> writePartitionFile("totalordercustomcomparator",
+        conf, revSplitStrings);
+    conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+    conf.setClass(MRJobConfig.KEY_COMPARATOR, ReverseHiveKeyComparator.class, RawComparator.class);
+    ArrayList<Check<HiveKey>> revCheck = new ArrayList<Check<HiveKey>>();
+    revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
+    revCheck.add(new Check<HiveKey>(new HiveKey("aaabb".getBytes()), 9));
+    revCheck.add(new Check<HiveKey>(new HiveKey("aabbb".getBytes()), 9));
+    revCheck.add(new Check<HiveKey>(new HiveKey("aaaaa".getBytes()), 9));
+    revCheck.add(new Check<HiveKey>(new HiveKey("babbb".getBytes()), 8));
+    revCheck.add(new Check<HiveKey>(new HiveKey("baabb".getBytes()), 8));
+    revCheck.add(new Check<HiveKey>(new HiveKey("yai".getBytes()), 1));
+    revCheck.add(new Check<HiveKey>(new HiveKey("yak".getBytes()), 1));
+    revCheck.add(new Check<HiveKey>(new HiveKey("z".getBytes()), 0));
+    revCheck.add(new Check<HiveKey>(new HiveKey("ddngo".getBytes()), 4));
+    revCheck.add(new Check<HiveKey>(new HiveKey("hi".getBytes()), 3));
+    try {
+      partitioner.configure(new JobConf(conf));
+      NullWritable nw = NullWritable.get();
+      for (Check<HiveKey> chk : revCheck) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
 }
\ No newline at end of file