You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2017/09/13 12:13:14 UTC
[4/5] incubator-hivemall git commit: Refactored to_ordered_list and
to_ordered_map UDAF
Refactored to_ordered_list and to_ordered_map UDAF
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/69730f65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/69730f65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/69730f65
Branch: refs/heads/master
Commit: 69730f65d76b54890b141ee13567eb681c0374ae
Parents: 21a8781
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:26:49 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:27:23 2017 +0900
----------------------------------------------------------------------
.../hivemall/tools/list/UDAFToOrderedList.java | 116 +++++-----
.../hivemall/tools/map/UDAFToOrderedMap.java | 221 +++----------------
.../tools/list/UDAFToOrderedListTest.java | 32 ++-
.../tools/map/UDAFToOrderedMapTest.java | 40 ++--
4 files changed, 132 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
index 16c966a..e88a16c 100644
--- a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
+++ b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
@@ -21,6 +21,23 @@ package hivemall.tools.list;
import hivemall.utils.collections.BoundedPriorityQueue;
import hivemall.utils.hadoop.HiveUtils;
import hivemall.utils.lang.CommandLineUtils;
+import hivemall.utils.lang.NaturalComparator;
+import hivemall.utils.lang.Preconditions;
+import hivemall.utils.struct.Pair;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
@@ -33,25 +50,26 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.*;
-
/**
* Return list of values sorted by value itself or specific key.
*/
-@Description(
- name = "to_ordered_list",
- value = "_FUNC_(value [, key, const string options]) - Return list of values sorted by value itself or specific key")
-public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
+@Description(name = "to_ordered_list",
+ value = "_FUNC_(PRIMITIVE value [, PRIMITIVE key, const string options])"
+ + " - Return list of values sorted by value itself or specific key")
+public final class UDAFToOrderedList extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
@@ -151,7 +169,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
int k = 0;
boolean reverseOrder = false;
-
if (argOIs.length >= optionIndex + 1) {
String rawArgs = HiveUtils.getConstString(argOIs[optionIndex]);
cl = parseOptions(rawArgs);
@@ -161,18 +178,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
if (cl.hasOption("k")) {
k = Integer.parseInt(cl.getOptionValue("k"));
if (k == 0) {
- throw new UDFArgumentException("`k` must be nonzero: " + k);
+ throw new UDFArgumentException("`k` must be non-zero value: " + k);
}
}
}
-
this.size = Math.abs(k);
- if ((k > 0 && reverseOrder) || (k < 0 && !reverseOrder) || (k == 0 && !reverseOrder)) {
- // reverse top-k, natural tail-k = ascending = natural order output = reverse order priority queue
+ if ((k > 0 && reverseOrder) || (k < 0 && reverseOrder == false)
+ || (k == 0 && reverseOrder == false)) {
+ // top-k on reverse order = tail-k on natural order (so, top-k on descending)
this.reverseOrder = true;
- } else { // (k > 0 && !reverseOrder) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
- // natural top-k or reverse tail-k = descending = reverse order output = natural order priority queue
+ } else { // (k > 0 && reverseOrder == false) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
+ // top-k on natural order = tail-k on reverse order (so, top-k on ascending)
this.reverseOrder = false;
}
@@ -190,7 +207,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
|| (argOIs.length == 3 && HiveUtils.isConstString(argOIs[2]));
if (sortByKey) {
- this.valueOI = argOIs[0];
+ this.valueOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[1]);
} else {
// sort values by value itself
@@ -230,20 +247,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
return outputOI;
}
+ @Nonnull
private static StructObjectInspector internalMergeOI(@Nonnull ObjectInspector valueOI,
@Nonnull PrimitiveObjectInspector keyOI) {
- ArrayList<String> fieldNames = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ List<String> fieldNames = new ArrayList<String>();
+ List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("valueList");
fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
fieldNames.add("keyList");
fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(keyOI)));
-
fieldNames.add("size");
fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
fieldNames.add("reverseOrder");
fieldOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
@@ -295,10 +310,10 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
throws HiveException {
QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
- Map<String, List<Object>> tuples = myagg.drainQueue();
- List<Object> valueList = tuples.get("value");
- List<Object> keyList = tuples.get("key");
- if (valueList.size() == 0) {
+ Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+ List<Object> keyList = tuples.getKey();
+ List<Object> valueList = tuples.getValue();
+ if (valueList.isEmpty()) {
return null;
}
@@ -307,7 +322,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
partialResult[1] = keyList;
partialResult[2] = new IntWritable(myagg.size);
partialResult[3] = new BooleanWritable(myagg.reverseOrder);
-
return partialResult;
}
@@ -345,11 +359,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
@Override
- public Object terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ public List<Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
- Map<String, List<Object>> tuples = myagg.drainQueue();
- return tuples.get("value");
+ Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+ return tuples.getValue();
}
static class QueueAggregationBuffer extends AbstractAggregationBuffer {
@@ -374,14 +388,14 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
this.reverseOrder = reverseOrder;
}
- void iterate(TupleWithKey tuple) {
+ void iterate(@Nonnull TupleWithKey tuple) {
if (queueHandler == null) {
initQueueHandler();
}
queueHandler.offer(tuple);
}
- void merge(List<Object> o_keyList, List<Object> o_valueList) {
+ void merge(@Nonnull List<Object> o_keyList, @Nonnull List<Object> o_valueList) {
if (queueHandler == null) {
initQueueHandler();
}
@@ -391,7 +405,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
@Nonnull
- Map<String, List<Object>> drainQueue() {
+ Pair<List<Object>, List<Object>> drainQueue() {
int n = queueHandler.size();
final Object[] keys = new Object[n];
final Object[] values = new Object[n];
@@ -402,10 +416,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
queueHandler.clear();
- Map<String, List<Object>> res = new HashMap<String, List<Object>>();
- res.put("key", Arrays.asList(keys));
- res.put("value", Arrays.asList(values));
- return res;
+ return Pair.of(Arrays.asList(keys), Arrays.asList(values));
}
private void initQueueHandler() {
@@ -413,12 +424,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
if (reverseOrder) {
comparator = Collections.reverseOrder();
} else {
- comparator = new Comparator<TupleWithKey>() {
- @Override
- public int compare(TupleWithKey o1, TupleWithKey o2) {
- return o1.compareTo(o2);
- }
- };
+ comparator = NaturalComparator.getInstance();
}
if (size > 0) {
@@ -436,10 +442,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
*/
private static abstract class AbstractQueueHandler {
- abstract void offer(TupleWithKey tuple);
+ abstract void offer(@Nonnull TupleWithKey tuple);
abstract int size();
+ @Nullable
abstract TupleWithKey poll();
abstract void clear();
@@ -450,6 +457,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
private static final int DEFAULT_INITIAL_CAPACITY = 11; // same as PriorityQueue
+ @Nonnull
private final PriorityQueue<TupleWithKey> queue;
QueueHandler(@Nonnull Comparator<TupleWithKey> comparator) {
@@ -480,6 +488,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
private static final class BoundedQueueHandler extends AbstractQueueHandler {
+ @Nonnull
private final BoundedPriorityQueue<TupleWithKey> queue;
BoundedQueueHandler(int size, @Nonnull Comparator<TupleWithKey> comparator) {
@@ -509,24 +518,29 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
private static final class TupleWithKey implements Comparable<TupleWithKey> {
- private Object key;
- private Object value;
+ @Nonnull
+ private final Object key;
+ @Nonnull
+ private final Object value;
- TupleWithKey(Object key, Object value) {
- this.key = key;
- this.value = value;
+ TupleWithKey(@CheckForNull Object key, @CheckForNull Object value) {
+ this.key = Preconditions.checkNotNull(key);
+ this.value = Preconditions.checkNotNull(value);
}
+ @Nonnull
Object getKey() {
return key;
}
+ @Nonnull
Object getValue() {
return value;
}
@Override
public int compareTo(TupleWithKey o) {
+ @SuppressWarnings("unchecked")
Comparable<? super Object> k = (Comparable<? super Object>) key;
return k.compareTo(o.getKey());
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index 3e6caa4..a6b547f 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -18,14 +18,14 @@
*/
package hivemall.tools.map;
+import hivemall.utils.collections.maps.BoundedSortedMap;
import hivemall.utils.hadoop.HiveUtils;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Map;
-import java.util.SortedMap;
import java.util.TreeMap;
+import javax.annotation.Nonnegative;
+
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
@@ -34,17 +34,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.IntWritable;
-
-import javax.annotation.Nonnull;
/**
* Convert two aggregated columns into a sorted key-value map.
@@ -52,13 +42,13 @@ import javax.annotation.Nonnull;
@Description(name = "to_ordered_map",
value = "_FUNC_(key, value [, const int k|const boolean reverseOrder=false]) "
+ "- Convert two aggregated columns into an ordered key-value map")
-public class UDAFToOrderedMap extends UDAFToMap {
+public final class UDAFToOrderedMap extends UDAFToMap {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
throws SemanticException {
@SuppressWarnings("deprecation")
- TypeInfo[] typeInfo = info.getParameters();
+ final TypeInfo[] typeInfo = info.getParameters();
if (typeInfo.length != 2 && typeInfo.length != 3) {
throw new UDFArgumentTypeException(typeInfo.length - 1,
"Expecting two or three arguments: " + typeInfo.length);
@@ -81,6 +71,7 @@ public class UDAFToOrderedMap extends UDAFToMap {
throw new UDFArgumentException("Map size must be nonzero: " + size);
}
reverseOrder = (size > 0); // positive size => top-k
+ size = Math.abs(size);
} else {
throw new UDFArgumentTypeException(2,
"The third argument must be boolean or integer type: "
@@ -89,205 +80,53 @@ public class UDAFToOrderedMap extends UDAFToMap {
}
if (reverseOrder) { // descending
- if (size != 0) {
- return new TopKOrderedMapEvaluator();
- }
- return new ReverseOrderedMapEvaluator();
+ return new DescendingMapEvaluator(size);
} else { // ascending
- if (size != 0) {
- return new TailKOrderedMapEvaluator();
- }
- return new NaturalOrderedMapEvaluator();
+ return new AscendingMapEvaluator(size);
}
}
- public static class NaturalOrderedMapEvaluator extends UDAFToMapEvaluator {
-
- @Override
- public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
- }
-
- }
+ public static final class AscendingMapEvaluator extends UDAFToMapEvaluator {
- public static class ReverseOrderedMapEvaluator extends UDAFToMapEvaluator {
+ private final int size;
- @Override
- public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
- Collections.reverseOrder());
- }
-
- }
-
- public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
-
- protected PrimitiveObjectInspector inputKeyOI;
- protected ObjectInspector inputValueOI;
- protected StandardMapObjectInspector partialMapOI;
- protected PrimitiveObjectInspector sizeOI;
-
- protected StructObjectInspector internalMergeOI;
-
- protected StructField partialMapField;
- protected StructField sizeField;
-
- @Override
- public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
- super.init(mode, argOIs);
-
- // initialize input
- if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
- this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
- this.inputValueOI = argOIs[1];
- this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
- } else {// from partial aggregation
- StructObjectInspector soi = (StructObjectInspector) argOIs[0];
- this.internalMergeOI = soi;
-
- this.partialMapField = soi.getStructFieldRef("partialMap");
- // re-extract input key/value OIs
- StandardMapObjectInspector partialMapOI = (StandardMapObjectInspector) partialMapField.getFieldObjectInspector();
- this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
- this.inputValueOI = partialMapOI.getMapValueObjectInspector();
-
- this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
- ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
- ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
-
- this.sizeField = soi.getStructFieldRef("size");
- this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
- }
-
- // initialize output
- final ObjectInspector outputOI;
- if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
- outputOI = internalMergeOI(inputKeyOI, inputValueOI);
- } else {// terminate
- outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
- ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
- ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
- }
- return outputOI;
- }
-
- private static StructObjectInspector internalMergeOI(
- @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
- ArrayList<String> fieldNames = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-
- fieldNames.add("partialMap");
- fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
- ObjectInspectorUtils.getStandardObjectInspector(keyOI),
- ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
- fieldNames.add("size");
- fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
- return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
- }
-
- static class MapAggregationBuffer extends AbstractAggregationBuffer {
- Map<Object, Object> container;
- int size;
-
- MapAggregationBuffer() {
- super();
- }
+ AscendingMapEvaluator(@Nonnegative int size) {
+ super();
+ this.size = size;
}
@Override
public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- myagg.container = new TreeMap<Object, Object>(Collections.reverseOrder());
- myagg.size = Integer.MAX_VALUE;
- }
-
- @Override
- public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
- MapAggregationBuffer myagg = new MapAggregationBuffer();
- reset(myagg);
- return myagg;
- }
-
- @Override
- public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
- Object[] parameters) throws HiveException {
- assert (parameters.length == 3);
-
- if (parameters[0] == null) {
- return;
+ if (size == 0) {
+ ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
+ } else {
+ ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size);
}
-
- Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
- Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
- int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
-
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- myagg.container.put(key, value);
- myagg.size = size;
}
- @Override
- public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-
- Object[] partialResult = new Object[2];
- partialResult[0] = myagg.container;
- partialResult[1] = new IntWritable(myagg.size);
-
- return partialResult;
- }
+ }
- @Override
- public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
- throws HiveException {
- if (partial == null) {
- return;
- }
+ public static final class DescendingMapEvaluator extends UDAFToMapEvaluator {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ private final int size;
- Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
- Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
- for (Map.Entry<?, ?> e : partialMap.entrySet()) {
- Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
- Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
- myagg.container.put(key, value);
- }
-
- Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
- int size = HiveUtils.getInt(sizeObj, sizeOI);
- myagg.size = size;
+ DescendingMapEvaluator(int size) {
+ super();
+ this.size = size;
}
@Override
- public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- if (myagg.size < myagg.container.size()) {
- Object toKey = myagg.container.keySet().toArray()[myagg.size];
- return ((SortedMap<Object, Object>) myagg.container).headMap(toKey);
- }
- return myagg.container;
- }
-
- }
-
- public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
-
- @Override
public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- myagg.container = new TreeMap<Object, Object>();
- myagg.size = Integer.MAX_VALUE;
+ if (size == 0) {
+ ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
+ Collections.reverseOrder());
+ } else {
+ ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size,
+ true);
+ }
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
index f466dbc..c3039d1 100644
--- a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
+++ b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
@@ -21,22 +21,20 @@ package hivemall.tools.list;
import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator;
import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator.QueueAggregationBuffer;
+import java.util.List;
+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.List;
-
-@SuppressWarnings("deprecation")
public class UDAFToOrderedListTest {
- GenericUDAFEvaluator evaluator;
- QueueAggregationBuffer agg;
+ private UDAFToOrderedListEvaluator evaluator;
+ private QueueAggregationBuffer agg;
@Before
public void setUp() throws Exception {
@@ -57,7 +55,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(3, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -81,7 +79,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(3, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -105,7 +103,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -129,7 +127,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -152,7 +150,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -176,7 +174,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -199,7 +197,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(3, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -257,7 +255,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -283,7 +281,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -308,7 +306,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -334,7 +332,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
index 9289a02..61642f1 100644
--- a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -18,27 +18,23 @@
*/
package hivemall.tools.map;
-import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.AscendingMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.DescendingMapEvaluator;
+
+import java.util.SortedMap;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
import org.junit.Assert;
import org.junit.Test;
-import java.util.SortedMap;
-
-@SuppressWarnings("deprecation")
public class UDAFToOrderedMapTest {
@Test
public void testNaturalOrder() throws Exception {
- GenericUDAFEvaluator evaluator = new NaturalOrderedMapEvaluator();
- NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ AscendingMapEvaluator evaluator = new AscendingMapEvaluator(0);
+ AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -61,12 +57,14 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals("apple", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
Assert.assertEquals("candy", sortedValues[2]);
+
+ evaluator.close();
}
@Test
public void testReverseOrder() throws Exception {
- GenericUDAFEvaluator evaluator = new ReverseOrderedMapEvaluator();
- ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ DescendingMapEvaluator evaluator = new DescendingMapEvaluator(0);
+ DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -90,12 +88,15 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals("candy", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
Assert.assertEquals("apple", sortedValues[2]);
+
+ evaluator.close();
}
@Test
public void testTopK() throws Exception {
- GenericUDAFEvaluator evaluator = new TopKOrderedMapEvaluator();
- TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ int size = 2;
+ DescendingMapEvaluator evaluator = new DescendingMapEvaluator(size);
+ DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -104,7 +105,6 @@ public class UDAFToOrderedMapTest {
final double[] keys = new double[] {0.7, 0.5, 0.8};
final String[] values = new String[] {"banana", "apple", "candy"};
- int size = 2;
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
evaluator.reset(agg);
@@ -119,12 +119,15 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals(size, sortedValues.length);
Assert.assertEquals("candy", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
+
+ evaluator.close();
}
@Test
public void testTailK() throws Exception {
- GenericUDAFEvaluator evaluator = new TailKOrderedMapEvaluator();
- TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ int size = -2;
+ AscendingMapEvaluator evaluator = new AscendingMapEvaluator(Math.abs(size));
+ AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -133,7 +136,6 @@ public class UDAFToOrderedMapTest {
final double[] keys = new double[] {0.7, 0.5, 0.8};
final String[] values = new String[] {"banana", "apple", "candy"};
- int size = -2;
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
evaluator.reset(agg);
@@ -148,6 +150,8 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals(Math.abs(size), sortedValues.length);
Assert.assertEquals("apple", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
+
+ evaluator.close();
}
}