You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2017/09/13 12:13:14 UTC

[4/5] incubator-hivemall git commit: Refactored to_ordered_list and to_ordered_map UDAF

Refactored to_ordered_list and to_ordered_map UDAF


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/69730f65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/69730f65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/69730f65

Branch: refs/heads/master
Commit: 69730f65d76b54890b141ee13567eb681c0374ae
Parents: 21a8781
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:26:49 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:27:23 2017 +0900

----------------------------------------------------------------------
 .../hivemall/tools/list/UDAFToOrderedList.java  | 116 +++++-----
 .../hivemall/tools/map/UDAFToOrderedMap.java    | 221 +++----------------
 .../tools/list/UDAFToOrderedListTest.java       |  32 ++-
 .../tools/map/UDAFToOrderedMapTest.java         |  40 ++--
 4 files changed, 132 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
index 16c966a..e88a16c 100644
--- a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
+++ b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
@@ -21,6 +21,23 @@ package hivemall.tools.list;
 import hivemall.utils.collections.BoundedPriorityQueue;
 import hivemall.utils.hadoop.HiveUtils;
 import hivemall.utils.lang.CommandLineUtils;
+import hivemall.utils.lang.NaturalComparator;
+import hivemall.utils.lang.Preconditions;
+import hivemall.utils.struct.Pair;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -33,25 +50,26 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.IntWritable;
 
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.*;
-
 /**
  * Return list of values sorted by value itself or specific key.
  */
-@Description(
-        name = "to_ordered_list",
-        value = "_FUNC_(value [, key, const string options]) - Return list of values sorted by value itself or specific key")
-public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
+@Description(name = "to_ordered_list",
+        value = "_FUNC_(PRIMITIVE value [, PRIMITIVE key, const string options])"
+                + " - Return list of values sorted by value itself or specific key")
+public final class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
     @Override
     public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
@@ -151,7 +169,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
             int k = 0;
             boolean reverseOrder = false;
-
             if (argOIs.length >= optionIndex + 1) {
                 String rawArgs = HiveUtils.getConstString(argOIs[optionIndex]);
                 cl = parseOptions(rawArgs);
@@ -161,18 +178,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 if (cl.hasOption("k")) {
                     k = Integer.parseInt(cl.getOptionValue("k"));
                     if (k == 0) {
-                        throw new UDFArgumentException("`k` must be nonzero: " + k);
+                        throw new UDFArgumentException("`k` must be non-zero value: " + k);
                     }
                 }
             }
-
             this.size = Math.abs(k);
 
-            if ((k > 0 && reverseOrder) || (k < 0 && !reverseOrder) || (k == 0 && !reverseOrder)) {
-                // reverse top-k, natural tail-k = ascending = natural order output = reverse order priority queue
+            if ((k > 0 && reverseOrder) || (k < 0 && reverseOrder == false)
+                    || (k == 0 && reverseOrder == false)) {
+                // top-k on reverse order = tail-k on natural order (so, top-k on descending)
                 this.reverseOrder = true;
-            } else { // (k > 0 && !reverseOrder) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
-                // natural top-k or reverse tail-k = descending = reverse order output = natural order priority queue
+            } else { // (k > 0 && reverseOrder == false) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
+                // top-k on natural order = tail-k on reverse order (so, top-k on ascending)
                 this.reverseOrder = false;
             }
 
@@ -190,7 +207,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                         || (argOIs.length == 3 && HiveUtils.isConstString(argOIs[2]));
 
                 if (sortByKey) {
-                    this.valueOI = argOIs[0];
+                    this.valueOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
                     this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[1]);
                 } else {
                     // sort values by value itself
@@ -230,20 +247,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
             return outputOI;
         }
 
+        @Nonnull
         private static StructObjectInspector internalMergeOI(@Nonnull ObjectInspector valueOI,
                 @Nonnull PrimitiveObjectInspector keyOI) {
-            ArrayList<String> fieldNames = new ArrayList<String>();
-            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+            List<String> fieldNames = new ArrayList<String>();
+            List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
 
             fieldNames.add("valueList");
             fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
             fieldNames.add("keyList");
             fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(keyOI)));
-
             fieldNames.add("size");
             fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
             fieldNames.add("reverseOrder");
             fieldOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
 
@@ -295,10 +310,10 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 throws HiveException {
             QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
 
-            Map<String, List<Object>> tuples = myagg.drainQueue();
-            List<Object> valueList = tuples.get("value");
-            List<Object> keyList = tuples.get("key");
-            if (valueList.size() == 0) {
+            Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+            List<Object> keyList = tuples.getKey();
+            List<Object> valueList = tuples.getValue();
+            if (valueList.isEmpty()) {
                 return null;
             }
 
@@ -307,7 +322,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
             partialResult[1] = keyList;
             partialResult[2] = new IntWritable(myagg.size);
             partialResult[3] = new BooleanWritable(myagg.reverseOrder);
-
             return partialResult;
         }
 
@@ -345,11 +359,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
         }
 
         @Override
-        public Object terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+        public List<Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
             QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
-            Map<String, List<Object>> tuples = myagg.drainQueue();
-            return tuples.get("value");
+            Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+            return tuples.getValue();
         }
 
         static class QueueAggregationBuffer extends AbstractAggregationBuffer {
@@ -374,14 +388,14 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 this.reverseOrder = reverseOrder;
             }
 
-            void iterate(TupleWithKey tuple) {
+            void iterate(@Nonnull TupleWithKey tuple) {
                 if (queueHandler == null) {
                     initQueueHandler();
                 }
                 queueHandler.offer(tuple);
             }
 
-            void merge(List<Object> o_keyList, List<Object> o_valueList) {
+            void merge(@Nonnull List<Object> o_keyList, @Nonnull List<Object> o_valueList) {
                 if (queueHandler == null) {
                     initQueueHandler();
                 }
@@ -391,7 +405,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
             }
 
             @Nonnull
-            Map<String, List<Object>> drainQueue() {
+            Pair<List<Object>, List<Object>> drainQueue() {
                 int n = queueHandler.size();
                 final Object[] keys = new Object[n];
                 final Object[] values = new Object[n];
@@ -402,10 +416,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 }
                 queueHandler.clear();
 
-                Map<String, List<Object>> res = new HashMap<String, List<Object>>();
-                res.put("key", Arrays.asList(keys));
-                res.put("value", Arrays.asList(values));
-                return res;
+                return Pair.of(Arrays.asList(keys), Arrays.asList(values));
             }
 
             private void initQueueHandler() {
@@ -413,12 +424,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 if (reverseOrder) {
                     comparator = Collections.reverseOrder();
                 } else {
-                    comparator = new Comparator<TupleWithKey>() {
-                        @Override
-                        public int compare(TupleWithKey o1, TupleWithKey o2) {
-                            return o1.compareTo(o2);
-                        }
-                    };
+                    comparator = NaturalComparator.getInstance();
                 }
 
                 if (size > 0) {
@@ -436,10 +442,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
          */
         private static abstract class AbstractQueueHandler {
 
-            abstract void offer(TupleWithKey tuple);
+            abstract void offer(@Nonnull TupleWithKey tuple);
 
             abstract int size();
 
+            @Nullable
             abstract TupleWithKey poll();
 
             abstract void clear();
@@ -450,6 +457,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
             private static final int DEFAULT_INITIAL_CAPACITY = 11; // same as PriorityQueue
 
+            @Nonnull
             private final PriorityQueue<TupleWithKey> queue;
 
             QueueHandler(@Nonnull Comparator<TupleWithKey> comparator) {
@@ -480,6 +488,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
         private static final class BoundedQueueHandler extends AbstractQueueHandler {
 
+            @Nonnull
             private final BoundedPriorityQueue<TupleWithKey> queue;
 
             BoundedQueueHandler(int size, @Nonnull Comparator<TupleWithKey> comparator) {
@@ -509,24 +518,29 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
         }
 
         private static final class TupleWithKey implements Comparable<TupleWithKey> {
-            private Object key;
-            private Object value;
+            @Nonnull
+            private final Object key;
+            @Nonnull
+            private final Object value;
 
-            TupleWithKey(Object key, Object value) {
-                this.key = key;
-                this.value = value;
+            TupleWithKey(@CheckForNull Object key, @CheckForNull Object value) {
+                this.key = Preconditions.checkNotNull(key);
+                this.value = Preconditions.checkNotNull(value);
             }
 
+            @Nonnull
             Object getKey() {
                 return key;
             }
 
+            @Nonnull
             Object getValue() {
                 return value;
             }
 
             @Override
             public int compareTo(TupleWithKey o) {
+                @SuppressWarnings("unchecked")
                 Comparable<? super Object> k = (Comparable<? super Object>) key;
                 return k.compareTo(o.getKey());
             }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index 3e6caa4..a6b547f 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -18,14 +18,14 @@
  */
 package hivemall.tools.map;
 
+import hivemall.utils.collections.maps.BoundedSortedMap;
 import hivemall.utils.hadoop.HiveUtils;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Map;
-import java.util.SortedMap;
 import java.util.TreeMap;
 
+import javax.annotation.Nonnegative;
+
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
@@ -34,17 +34,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.IntWritable;
-
-import javax.annotation.Nonnull;
 
 /**
  * Convert two aggregated columns into a sorted key-value map.
@@ -52,13 +42,13 @@ import javax.annotation.Nonnull;
 @Description(name = "to_ordered_map",
         value = "_FUNC_(key, value [, const int k|const boolean reverseOrder=false]) "
                 + "- Convert two aggregated columns into an ordered key-value map")
-public class UDAFToOrderedMap extends UDAFToMap {
+public final class UDAFToOrderedMap extends UDAFToMap {
 
     @Override
     public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
             throws SemanticException {
         @SuppressWarnings("deprecation")
-        TypeInfo[] typeInfo = info.getParameters();
+        final TypeInfo[] typeInfo = info.getParameters();
         if (typeInfo.length != 2 && typeInfo.length != 3) {
             throw new UDFArgumentTypeException(typeInfo.length - 1,
                 "Expecting two or three arguments: " + typeInfo.length);
@@ -81,6 +71,7 @@ public class UDAFToOrderedMap extends UDAFToMap {
                     throw new UDFArgumentException("Map size must be nonzero: " + size);
                 }
                 reverseOrder = (size > 0); // positive size => top-k
+                size = Math.abs(size);
             } else {
                 throw new UDFArgumentTypeException(2,
                     "The third argument must be boolean or integer type: "
@@ -89,205 +80,53 @@ public class UDAFToOrderedMap extends UDAFToMap {
         }
 
         if (reverseOrder) { // descending
-            if (size != 0) {
-                return new TopKOrderedMapEvaluator();
-            }
-            return new ReverseOrderedMapEvaluator();
+            return new DescendingMapEvaluator(size);
         } else { // ascending
-            if (size != 0) {
-                return new TailKOrderedMapEvaluator();
-            }
-            return new NaturalOrderedMapEvaluator();
+            return new AscendingMapEvaluator(size);
         }
     }
 
-    public static class NaturalOrderedMapEvaluator extends UDAFToMapEvaluator {
-
-        @Override
-        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
-        }
-
-    }
+    public static final class AscendingMapEvaluator extends UDAFToMapEvaluator {
 
-    public static class ReverseOrderedMapEvaluator extends UDAFToMapEvaluator {
+        private final int size;
 
-        @Override
-        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
-                Collections.reverseOrder());
-        }
-
-    }
-
-    public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
-
-        protected PrimitiveObjectInspector inputKeyOI;
-        protected ObjectInspector inputValueOI;
-        protected StandardMapObjectInspector partialMapOI;
-        protected PrimitiveObjectInspector sizeOI;
-
-        protected StructObjectInspector internalMergeOI;
-
-        protected StructField partialMapField;
-        protected StructField sizeField;
-
-        @Override
-        public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
-            super.init(mode, argOIs);
-
-            // initialize input
-            if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
-                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
-                this.inputValueOI = argOIs[1];
-                this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
-            } else {// from partial aggregation
-                StructObjectInspector soi = (StructObjectInspector) argOIs[0];
-                this.internalMergeOI = soi;
-
-                this.partialMapField = soi.getStructFieldRef("partialMap");
-                // re-extract input key/value OIs
-                StandardMapObjectInspector partialMapOI = (StandardMapObjectInspector) partialMapField.getFieldObjectInspector();
-                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
-                this.inputValueOI = partialMapOI.getMapValueObjectInspector();
-
-                this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
-                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
-                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
-
-                this.sizeField = soi.getStructFieldRef("size");
-                this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
-            }
-
-            // initialize output
-            final ObjectInspector outputOI;
-            if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
-                outputOI = internalMergeOI(inputKeyOI, inputValueOI);
-            } else {// terminate
-                outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
-                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
-                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
-            }
-            return outputOI;
-        }
-
-        private static StructObjectInspector internalMergeOI(
-                @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
-            ArrayList<String> fieldNames = new ArrayList<String>();
-            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-
-            fieldNames.add("partialMap");
-            fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
-                ObjectInspectorUtils.getStandardObjectInspector(keyOI),
-                ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
-            fieldNames.add("size");
-            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
-            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
-        }
-
-        static class MapAggregationBuffer extends AbstractAggregationBuffer {
-            Map<Object, Object> container;
-            int size;
-
-            MapAggregationBuffer() {
-                super();
-            }
+        AscendingMapEvaluator(@Nonnegative int size) {
+            super();
+            this.size = size;
         }
 
         @Override
         public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            myagg.container = new TreeMap<Object, Object>(Collections.reverseOrder());
-            myagg.size = Integer.MAX_VALUE;
-        }
-
-        @Override
-        public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
-            MapAggregationBuffer myagg = new MapAggregationBuffer();
-            reset(myagg);
-            return myagg;
-        }
-
-        @Override
-        public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
-                Object[] parameters) throws HiveException {
-            assert (parameters.length == 3);
-
-            if (parameters[0] == null) {
-                return;
+            if (size == 0) {
+                ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
+            } else {
+                ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size);
             }
-
-            Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
-            Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
-            int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
-
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            myagg.container.put(key, value);
-            myagg.size = size;
         }
 
-        @Override
-        public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-
-            Object[] partialResult = new Object[2];
-            partialResult[0] = myagg.container;
-            partialResult[1] = new IntWritable(myagg.size);
-
-            return partialResult;
-        }
+    }
 
-        @Override
-        public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
-                throws HiveException {
-            if (partial == null) {
-                return;
-            }
+    public static final class DescendingMapEvaluator extends UDAFToMapEvaluator {
 
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+        private final int size;
 
-            Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
-            Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
-            for (Map.Entry<?, ?> e : partialMap.entrySet()) {
-                Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
-                Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
-                myagg.container.put(key, value);
-            }
-
-            Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
-            int size = HiveUtils.getInt(sizeObj, sizeOI);
-            myagg.size = size;
+        DescendingMapEvaluator(int size) {
+            super();
+            this.size = size;
         }
 
         @Override
-        public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            if (myagg.size < myagg.container.size()) {
-                Object toKey = myagg.container.keySet().toArray()[myagg.size];
-                return ((SortedMap<Object, Object>) myagg.container).headMap(toKey);
-            }
-            return myagg.container;
-        }
-
-    }
-
-    public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
-
-        @Override
         public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            myagg.container = new TreeMap<Object, Object>();
-            myagg.size = Integer.MAX_VALUE;
+            if (size == 0) {
+                ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
+                    Collections.reverseOrder());
+            } else {
+                ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size,
+                    true);
+            }
         }
 
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
index f466dbc..c3039d1 100644
--- a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
+++ b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
@@ -21,22 +21,20 @@ package hivemall.tools.list;
 import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator;
 import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator.QueueAggregationBuffer;
 
+import java.util.List;
+
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.List;
-
-@SuppressWarnings("deprecation")
 public class UDAFToOrderedListTest {
 
-    GenericUDAFEvaluator evaluator;
-    QueueAggregationBuffer agg;
+    private UDAFToOrderedListEvaluator evaluator;
+    private QueueAggregationBuffer agg;
 
     @Before
     public void setUp() throws Exception {
@@ -57,7 +55,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(3, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -81,7 +79,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(3, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -105,7 +103,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -129,7 +127,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -152,7 +150,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -176,7 +174,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -199,7 +197,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(3, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -257,7 +255,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -283,7 +281,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -308,7 +306,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -334,7 +332,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
index 9289a02..61642f1 100644
--- a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -18,27 +18,23 @@
  */
 package hivemall.tools.map;
 
-import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.AscendingMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.DescendingMapEvaluator;
+
+import java.util.SortedMap;
 
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.SortedMap;
-
-@SuppressWarnings("deprecation")
 public class UDAFToOrderedMapTest {
 
     @Test
     public void testNaturalOrder() throws Exception {
-        GenericUDAFEvaluator evaluator = new NaturalOrderedMapEvaluator();
-        NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        AscendingMapEvaluator evaluator = new AscendingMapEvaluator(0);
+        AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -61,12 +57,14 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals("apple", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
         Assert.assertEquals("candy", sortedValues[2]);
+
+        evaluator.close();
     }
 
     @Test
     public void testReverseOrder() throws Exception {
-        GenericUDAFEvaluator evaluator = new ReverseOrderedMapEvaluator();
-        ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        DescendingMapEvaluator evaluator = new DescendingMapEvaluator(0);
+        DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -90,12 +88,15 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals("candy", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
         Assert.assertEquals("apple", sortedValues[2]);
+
+        evaluator.close();
     }
 
     @Test
     public void testTopK() throws Exception {
-        GenericUDAFEvaluator evaluator = new TopKOrderedMapEvaluator();
-        TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        int size = 2;
+        DescendingMapEvaluator evaluator = new DescendingMapEvaluator(size);
+        DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -104,7 +105,6 @@ public class UDAFToOrderedMapTest {
 
         final double[] keys = new double[] {0.7, 0.5, 0.8};
         final String[] values = new String[] {"banana", "apple", "candy"};
-        int size = 2;
 
         evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
         evaluator.reset(agg);
@@ -119,12 +119,15 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals(size, sortedValues.length);
         Assert.assertEquals("candy", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
+
+        evaluator.close();
     }
 
     @Test
     public void testTailK() throws Exception {
-        GenericUDAFEvaluator evaluator = new TailKOrderedMapEvaluator();
-        TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        int size = -2;
+        AscendingMapEvaluator evaluator = new AscendingMapEvaluator(Math.abs(size));
+        AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -133,7 +136,6 @@ public class UDAFToOrderedMapTest {
 
         final double[] keys = new double[] {0.7, 0.5, 0.8};
         final String[] values = new String[] {"banana", "apple", "candy"};
-        int size = -2;
 
         evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
         evaluator.reset(agg);
@@ -148,6 +150,8 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals(Math.abs(size), sortedValues.length);
         Assert.assertEquals("apple", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
+
+        evaluator.close();
     }
 
 }