You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2017/09/13 12:13:11 UTC

[1/5] incubator-hivemall git commit: Close #108: [HIVEMALL-138] to_ordered_map & to_ordered_list as a UDAF variant of each_top_k

Repository: incubator-hivemall
Updated Branches:
  refs/heads/master 380478916 -> 688daa5f8


Close #108: [HIVEMALL-138] to_ordered_map & to_ordered_list as a UDAF variant of each_top_k


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/9cd3c59a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/9cd3c59a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/9cd3c59a

Branch: refs/heads/master
Commit: 9cd3c59aebb67cc6b58cdd611b96fcf42f297cde
Parents: 3804789
Author: Takuya Kitazawa <k....@gmail.com>
Authored: Mon Sep 11 15:38:05 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Mon Sep 11 15:38:05 2017 +0900

----------------------------------------------------------------------
 .../hivemall/tools/list/UDAFToOrderedList.java  | 535 +++++++++++++++++++
 .../hivemall/tools/map/UDAFToOrderedMap.java    | 214 +++++++-
 .../java/hivemall/utils/hadoop/HiveUtils.java   |   7 +
 .../tools/array/SelectKBeatUDFTest.java         |  69 ---
 .../tools/array/SelectKBestUDFTest.java         |  69 +++
 .../tools/list/UDAFToOrderedListTest.java       | 344 ++++++++++++
 .../tools/map/UDAFToOrderedMapTest.java         | 153 ++++++
 docs/gitbook/eval/rank.md                       |   5 +
 docs/gitbook/misc/generic_funcs.md              |   7 +-
 docs/gitbook/misc/topk.md                       |  63 +++
 docs/gitbook/recommend/item_based_cf.md         |   5 +
 docs/gitbook/recommend/movielens_cf.md          |   5 +
 resources/ddl/define-all-as-permanent.hive      |   7 +
 resources/ddl/define-all.hive                   |   7 +
 resources/ddl/define-all.spark                  |   7 +
 resources/ddl/define-udfs.td.hql                |   1 +
 16 files changed, 1419 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
new file mode 100644
index 0000000..16c966a
--- /dev/null
+++ b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.list;
+
+import hivemall.utils.collections.BoundedPriorityQueue;
+import hivemall.utils.hadoop.HiveUtils;
+import hivemall.utils.lang.CommandLineUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.*;
+
+/**
+ * Return list of values sorted by value itself or specific key.
+ */
+@Description(
+        name = "to_ordered_list",
+        value = "_FUNC_(value [, key, const string options]) - Return list of values sorted by value itself or specific key")
+public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
+
+    @Override
+    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
+            throws SemanticException {
+        @SuppressWarnings("deprecation")
+        TypeInfo[] typeInfo = info.getParameters();
+        ObjectInspector[] argOIs = info.getParameterObjectInspectors();
+        if ((typeInfo.length == 1) || (typeInfo.length == 2 && HiveUtils.isConstString(argOIs[1]))) {
+            // sort values by value itself w/o key
+            if (typeInfo[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+                throw new UDFArgumentTypeException(0,
+                    "Only primitive type arguments are accepted for value but "
+                            + typeInfo[0].getTypeName() + " was passed as the first parameter.");
+            }
+        } else if ((typeInfo.length == 2)
+                || (typeInfo.length == 3 && HiveUtils.isConstString(argOIs[2]))) {
+            // sort values by key
+            if (typeInfo[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+                throw new UDFArgumentTypeException(1,
+                    "Only primitive type arguments are accepted for key but "
+                            + typeInfo[1].getTypeName() + " was passed as the second parameter.");
+            }
+        } else {
+            throw new UDFArgumentTypeException(typeInfo.length - 1,
+                "Number of arguments must be in [1, 3] including constant string for options: "
+                        + typeInfo.length);
+        }
+        return new UDAFToOrderedListEvaluator();
+    }
+
+    public static class UDAFToOrderedListEvaluator extends GenericUDAFEvaluator {
+
+        private ObjectInspector valueOI;
+        private PrimitiveObjectInspector keyOI;
+
+        private ListObjectInspector valueListOI;
+        private ListObjectInspector keyListOI;
+
+        private StructObjectInspector internalMergeOI;
+
+        private StructField valueListField;
+        private StructField keyListField;
+        private StructField sizeField;
+        private StructField reverseOrderField;
+
+        @Nonnegative
+        private int size;
+        private boolean reverseOrder;
+        private boolean sortByKey;
+
+        protected Options getOptions() {
+            Options opts = new Options();
+            opts.addOption("k", true, "To top-k (positive) or tail-k (negative) ordered queue");
+            opts.addOption("reverse", "reverse_order", false,
+                "Sort values by key in a reverse (e.g., descending) order [default: false]");
+            return opts;
+        }
+
+        @Nonnull
+        protected final CommandLine parseOptions(String optionValue) throws UDFArgumentException {
+            String[] args = optionValue.split("\\s+");
+            Options opts = getOptions();
+            opts.addOption("help", false, "Show function help");
+            CommandLine cl = CommandLineUtils.parseOptions(args, opts);
+
+            if (cl.hasOption("help")) {
+                Description funcDesc = getClass().getAnnotation(Description.class);
+                final String cmdLineSyntax;
+                if (funcDesc == null) {
+                    cmdLineSyntax = getClass().getSimpleName();
+                } else {
+                    String funcName = funcDesc.name();
+                    cmdLineSyntax = funcName == null ? getClass().getSimpleName()
+                            : funcDesc.value().replace("_FUNC_", funcDesc.name());
+                }
+                StringWriter sw = new StringWriter();
+                sw.write('\n');
+                PrintWriter pw = new PrintWriter(sw);
+                HelpFormatter formatter = new HelpFormatter();
+                formatter.printHelp(pw, HelpFormatter.DEFAULT_WIDTH, cmdLineSyntax, null, opts,
+                    HelpFormatter.DEFAULT_LEFT_PAD, HelpFormatter.DEFAULT_DESC_PAD, null, true);
+                pw.flush();
+                String helpMsg = sw.toString();
+                throw new UDFArgumentException(helpMsg);
+            }
+
+            return cl;
+        }
+
+        protected CommandLine processOptions(ObjectInspector[] argOIs) throws UDFArgumentException {
+            CommandLine cl = null;
+
+            int optionIndex = 1;
+            if (sortByKey) {
+                optionIndex = 2;
+            }
+
+            int k = 0;
+            boolean reverseOrder = false;
+
+            if (argOIs.length >= optionIndex + 1) {
+                String rawArgs = HiveUtils.getConstString(argOIs[optionIndex]);
+                cl = parseOptions(rawArgs);
+
+                reverseOrder = cl.hasOption("reverse_order");
+
+                if (cl.hasOption("k")) {
+                    k = Integer.parseInt(cl.getOptionValue("k"));
+                    if (k == 0) {
+                        throw new UDFArgumentException("`k` must be nonzero: " + k);
+                    }
+                }
+            }
+
+            this.size = Math.abs(k);
+
+            if ((k > 0 && reverseOrder) || (k < 0 && !reverseOrder) || (k == 0 && !reverseOrder)) {
+                // reverse top-k, natural tail-k = ascending = natural order output = reverse order priority queue
+                this.reverseOrder = true;
+            } else { // (k > 0 && !reverseOrder) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
+                // natural top-k or reverse tail-k = descending = reverse order output = natural order priority queue
+                this.reverseOrder = false;
+            }
+
+            return cl;
+        }
+
+        @Override
+        public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
+            super.init(mode, argOIs);
+
+            // initialize input
+            if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
+                // this flag will be used in `processOptions` and `iterate` (= when Mode.PARTIAL1 or Mode.COMPLETE)
+                this.sortByKey = (argOIs.length == 2 && !HiveUtils.isConstString(argOIs[1]))
+                        || (argOIs.length == 3 && HiveUtils.isConstString(argOIs[2]));
+
+                if (sortByKey) {
+                    this.valueOI = argOIs[0];
+                    this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[1]);
+                } else {
+                    // sort values by value itself
+                    this.valueOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+                    this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+                }
+
+                processOptions(argOIs);
+            } else {// from partial aggregation
+                StructObjectInspector soi = (StructObjectInspector) argOIs[0];
+                this.internalMergeOI = soi;
+
+                // re-extract input value OI
+                this.valueListField = soi.getStructFieldRef("valueList");
+                StandardListObjectInspector valueListOI = (StandardListObjectInspector) valueListField.getFieldObjectInspector();
+                this.valueOI = valueListOI.getListElementObjectInspector();
+                this.valueListOI = ObjectInspectorFactory.getStandardListObjectInspector(valueOI);
+
+                // re-extract input key OI
+                this.keyListField = soi.getStructFieldRef("keyList");
+                StandardListObjectInspector keyListOI = (StandardListObjectInspector) keyListField.getFieldObjectInspector();
+                this.keyOI = HiveUtils.asPrimitiveObjectInspector(keyListOI.getListElementObjectInspector());
+                this.keyListOI = ObjectInspectorFactory.getStandardListObjectInspector(keyOI);
+
+                this.sizeField = soi.getStructFieldRef("size");
+                this.reverseOrderField = soi.getStructFieldRef("reverseOrder");
+            }
+
+            // initialize output
+            final ObjectInspector outputOI;
+            if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
+                outputOI = internalMergeOI(valueOI, keyOI);
+            } else {// terminate
+                outputOI = ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI));
+            }
+
+            return outputOI;
+        }
+
+        private static StructObjectInspector internalMergeOI(@Nonnull ObjectInspector valueOI,
+                @Nonnull PrimitiveObjectInspector keyOI) {
+            ArrayList<String> fieldNames = new ArrayList<String>();
+            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+
+            fieldNames.add("valueList");
+            fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
+
+            fieldNames.add("keyList");
+            fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(keyOI)));
+
+            fieldNames.add("size");
+            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+
+            fieldNames.add("reverseOrder");
+            fieldOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
+
+            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+            QueueAggregationBuffer myagg = new QueueAggregationBuffer();
+            reset(myagg);
+            return myagg;
+        }
+
+        @Override
+        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+            myagg.reset(size, reverseOrder);
+        }
+
+        @Override
+        public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
+                Object[] parameters) throws HiveException {
+            if (parameters[0] == null) {
+                return;
+            }
+            Object value = ObjectInspectorUtils.copyToStandardObject(parameters[0], valueOI);
+
+            final Object key;
+            if (sortByKey) {
+                if (parameters[1] == null) {
+                    return;
+                }
+                key = ObjectInspectorUtils.copyToStandardObject(parameters[1], keyOI);
+            } else {
+                // set value to key
+                key = ObjectInspectorUtils.copyToStandardObject(parameters[0], valueOI);
+            }
+
+            TupleWithKey tuple = new TupleWithKey(key, value);
+            QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+
+            myagg.iterate(tuple);
+        }
+
+        @Override
+        public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+
+            Map<String, List<Object>> tuples = myagg.drainQueue();
+            List<Object> valueList = tuples.get("value");
+            List<Object> keyList = tuples.get("key");
+            if (valueList.size() == 0) {
+                return null;
+            }
+
+            Object[] partialResult = new Object[4];
+            partialResult[0] = valueList;
+            partialResult[1] = keyList;
+            partialResult[2] = new IntWritable(myagg.size);
+            partialResult[3] = new BooleanWritable(myagg.reverseOrder);
+
+            return partialResult;
+        }
+
+        @Override
+        public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
+                throws HiveException {
+            if (partial == null) {
+                return;
+            }
+
+            Object valueListObj = internalMergeOI.getStructFieldData(partial, valueListField);
+            final List<?> valueListRaw = valueListOI.getList(HiveUtils.castLazyBinaryObject(valueListObj));
+            final List<Object> valueList = new ArrayList<Object>();
+            for (int i = 0, n = valueListRaw.size(); i < n; i++) {
+                valueList.add(ObjectInspectorUtils.copyToStandardObject(valueListRaw.get(i),
+                    valueOI));
+            }
+
+            Object keyListObj = internalMergeOI.getStructFieldData(partial, keyListField);
+            final List<?> keyListRaw = keyListOI.getList(HiveUtils.castLazyBinaryObject(keyListObj));
+            final List<Object> keyList = new ArrayList<Object>();
+            for (int i = 0, n = keyListRaw.size(); i < n; i++) {
+                keyList.add(ObjectInspectorUtils.copyToStandardObject(keyListRaw.get(i), keyOI));
+            }
+
+            Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
+            int size = PrimitiveObjectInspectorFactory.writableIntObjectInspector.get(sizeObj);
+
+            Object reverseOrderObj = internalMergeOI.getStructFieldData(partial, reverseOrderField);
+            boolean reverseOrder = PrimitiveObjectInspectorFactory.writableBooleanObjectInspector.get(reverseOrderObj);
+
+            QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+            myagg.setOptions(size, reverseOrder);
+            myagg.merge(keyList, valueList);
+        }
+
+        @Override
+        public Object terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+            Map<String, List<Object>> tuples = myagg.drainQueue();
+            return tuples.get("value");
+        }
+
+        static class QueueAggregationBuffer extends AbstractAggregationBuffer {
+
+            private AbstractQueueHandler queueHandler;
+
+            @Nonnegative
+            private int size;
+            private boolean reverseOrder;
+
+            QueueAggregationBuffer() {
+                super();
+            }
+
+            void reset(@Nonnegative int size, boolean reverseOrder) {
+                setOptions(size, reverseOrder);
+                this.queueHandler = null;
+            }
+
+            void setOptions(@Nonnegative int size, boolean reverseOrder) {
+                this.size = size;
+                this.reverseOrder = reverseOrder;
+            }
+
+            void iterate(TupleWithKey tuple) {
+                if (queueHandler == null) {
+                    initQueueHandler();
+                }
+                queueHandler.offer(tuple);
+            }
+
+            void merge(List<Object> o_keyList, List<Object> o_valueList) {
+                if (queueHandler == null) {
+                    initQueueHandler();
+                }
+                for (int i = 0, n = o_keyList.size(); i < n; i++) {
+                    queueHandler.offer(new TupleWithKey(o_keyList.get(i), o_valueList.get(i)));
+                }
+            }
+
+            @Nonnull
+            Map<String, List<Object>> drainQueue() {
+                int n = queueHandler.size();
+                final Object[] keys = new Object[n];
+                final Object[] values = new Object[n];
+                for (int i = n - 1; i >= 0; i--) { // head element in queue should be stored to tail of array
+                    TupleWithKey tuple = queueHandler.poll();
+                    keys[i] = tuple.getKey();
+                    values[i] = tuple.getValue();
+                }
+                queueHandler.clear();
+
+                Map<String, List<Object>> res = new HashMap<String, List<Object>>();
+                res.put("key", Arrays.asList(keys));
+                res.put("value", Arrays.asList(values));
+                return res;
+            }
+
+            private void initQueueHandler() {
+                final Comparator<TupleWithKey> comparator;
+                if (reverseOrder) {
+                    comparator = Collections.reverseOrder();
+                } else {
+                    comparator = new Comparator<TupleWithKey>() {
+                        @Override
+                        public int compare(TupleWithKey o1, TupleWithKey o2) {
+                            return o1.compareTo(o2);
+                        }
+                    };
+                }
+
+                if (size > 0) {
+                    this.queueHandler = new BoundedQueueHandler(size, comparator);
+                } else {
+                    this.queueHandler = new QueueHandler(comparator);
+                }
+            }
+
+        }
+
+        /**
+         * Since BoundedPriorityQueue does not directly inherit PriorityQueue, we provide handler
+         * class which wraps each of PriorityQueue and BoundedPriorityQueue.
+         */
+        private static abstract class AbstractQueueHandler {
+
+            abstract void offer(TupleWithKey tuple);
+
+            abstract int size();
+
+            abstract TupleWithKey poll();
+
+            abstract void clear();
+
+        }
+
+        private static final class QueueHandler extends AbstractQueueHandler {
+
+            private static final int DEFAULT_INITIAL_CAPACITY = 11; // same as PriorityQueue
+
+            private final PriorityQueue<TupleWithKey> queue;
+
+            QueueHandler(@Nonnull Comparator<TupleWithKey> comparator) {
+                this.queue = new PriorityQueue<TupleWithKey>(DEFAULT_INITIAL_CAPACITY, comparator);
+            }
+
+            @Override
+            void offer(TupleWithKey tuple) {
+                queue.offer(tuple);
+            }
+
+            @Override
+            int size() {
+                return queue.size();
+            }
+
+            @Override
+            TupleWithKey poll() {
+                return queue.poll();
+            }
+
+            @Override
+            void clear() {
+                queue.clear();
+            }
+
+        }
+
+        private static final class BoundedQueueHandler extends AbstractQueueHandler {
+
+            private final BoundedPriorityQueue<TupleWithKey> queue;
+
+            BoundedQueueHandler(int size, @Nonnull Comparator<TupleWithKey> comparator) {
+                this.queue = new BoundedPriorityQueue<TupleWithKey>(size, comparator);
+            }
+
+            @Override
+            void offer(TupleWithKey tuple) {
+                queue.offer(tuple);
+            }
+
+            @Override
+            int size() {
+                return queue.size();
+            }
+
+            @Override
+            TupleWithKey poll() {
+                return queue.poll();
+            }
+
+            @Override
+            void clear() {
+                queue.clear();
+            }
+
+        }
+
+        private static final class TupleWithKey implements Comparable<TupleWithKey> {
+            private Object key;
+            private Object value;
+
+            TupleWithKey(Object key, Object value) {
+                this.key = key;
+                this.value = value;
+            }
+
+            Object getKey() {
+                return key;
+            }
+
+            Object getValue() {
+                return value;
+            }
+
+            @Override
+            public int compareTo(TupleWithKey o) {
+                Comparable<? super Object> k = (Comparable<? super Object>) key;
+                return k.compareTo(o.getKey());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index 4d011cd..3e6caa4 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -20,23 +20,37 @@ package hivemall.tools.map;
 
 import hivemall.utils.hadoop.HiveUtils;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.IntWritable;
+
+import javax.annotation.Nonnull;
 
 /**
  * Convert two aggregated columns into a sorted key-value map.
  */
 @Description(name = "to_ordered_map",
-        value = "_FUNC_(key, value [, const boolean reverseOrder=false]) "
+        value = "_FUNC_(key, value [, const int k|const boolean reverseOrder=false]) "
                 + "- Convert two aggregated columns into an ordered key-value map")
 public class UDAFToOrderedMap extends UDAFToMap {
 
@@ -54,19 +68,35 @@ public class UDAFToOrderedMap extends UDAFToMap {
                 "Only primitive type arguments are accepted for the key but "
                         + typeInfo[0].getTypeName() + " was passed as parameter 1.");
         }
+
         boolean reverseOrder = false;
+        int size = 0;
         if (typeInfo.length == 3) {
-            if (HiveUtils.isBooleanTypeInfo(typeInfo[2]) == false) {
-                throw new UDFArgumentTypeException(2, "The three argument must be boolean type: "
-                        + typeInfo[2].getTypeName());
-            }
             ObjectInspector[] argOIs = info.getParameterObjectInspectors();
-            reverseOrder = HiveUtils.getConstBoolean(argOIs[2]);
+            if (HiveUtils.isBooleanTypeInfo(typeInfo[2])) {
+                reverseOrder = HiveUtils.getConstBoolean(argOIs[2]);
+            } else if (HiveUtils.isIntegerTypeInfo(typeInfo[2])) {
+                size = HiveUtils.getConstInt(argOIs[2]);
+                if (size == 0) {
+                    throw new UDFArgumentException("Map size must be nonzero: " + size);
+                }
+                reverseOrder = (size > 0); // positive size => top-k
+            } else {
+                throw new UDFArgumentTypeException(2,
+                    "The third argument must be boolean or integer type: "
+                            + typeInfo[2].getTypeName());
+            }
         }
 
-        if (reverseOrder) {
+        if (reverseOrder) { // descending
+            if (size != 0) {
+                return new TopKOrderedMapEvaluator();
+            }
             return new ReverseOrderedMapEvaluator();
-        } else {
+        } else { // ascending
+            if (size != 0) {
+                return new TailKOrderedMapEvaluator();
+            }
             return new NaturalOrderedMapEvaluator();
         }
     }
@@ -92,4 +122,172 @@ public class UDAFToOrderedMap extends UDAFToMap {
 
     }
 
+    public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
+
+        protected PrimitiveObjectInspector inputKeyOI;
+        protected ObjectInspector inputValueOI;
+        protected StandardMapObjectInspector partialMapOI;
+        protected PrimitiveObjectInspector sizeOI;
+
+        protected StructObjectInspector internalMergeOI;
+
+        protected StructField partialMapField;
+        protected StructField sizeField;
+
+        @Override
+        public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
+            super.init(mode, argOIs);
+
+            // initialize input
+            if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
+                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+                this.inputValueOI = argOIs[1];
+                this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
+            } else {// from partial aggregation
+                StructObjectInspector soi = (StructObjectInspector) argOIs[0];
+                this.internalMergeOI = soi;
+
+                this.partialMapField = soi.getStructFieldRef("partialMap");
+                // re-extract input key/value OIs
+                StandardMapObjectInspector partialMapOI = (StandardMapObjectInspector) partialMapField.getFieldObjectInspector();
+                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
+                this.inputValueOI = partialMapOI.getMapValueObjectInspector();
+
+                this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+
+                this.sizeField = soi.getStructFieldRef("size");
+                this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
+            }
+
+            // initialize output
+            final ObjectInspector outputOI;
+            if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
+                outputOI = internalMergeOI(inputKeyOI, inputValueOI);
+            } else {// terminate
+                outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+            }
+            return outputOI;
+        }
+
+        private static StructObjectInspector internalMergeOI(
+                @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
+            ArrayList<String> fieldNames = new ArrayList<String>();
+            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+
+            fieldNames.add("partialMap");
+            fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
+                ObjectInspectorUtils.getStandardObjectInspector(keyOI),
+                ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
+
+            fieldNames.add("size");
+            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+
+            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+        }
+
+        static class MapAggregationBuffer extends AbstractAggregationBuffer {
+            Map<Object, Object> container;
+            int size;
+
+            MapAggregationBuffer() {
+                super();
+            }
+        }
+
+        @Override
+        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+            myagg.container = new TreeMap<Object, Object>(Collections.reverseOrder());
+            myagg.size = Integer.MAX_VALUE;
+        }
+
+        @Override
+        public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
+            MapAggregationBuffer myagg = new MapAggregationBuffer();
+            reset(myagg);
+            return myagg;
+        }
+
+        @Override
+        public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
+                Object[] parameters) throws HiveException {
+            assert (parameters.length == 3);
+
+            if (parameters[0] == null) {
+                return;
+            }
+
+            Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
+            Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
+            int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
+
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+            myagg.container.put(key, value);
+            myagg.size = size;
+        }
+
+        @Override
+        public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+            Object[] partialResult = new Object[2];
+            partialResult[0] = myagg.container;
+            partialResult[1] = new IntWritable(myagg.size);
+
+            return partialResult;
+        }
+
+        @Override
+        public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
+                throws HiveException {
+            if (partial == null) {
+                return;
+            }
+
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+            Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
+            Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
+            for (Map.Entry<?, ?> e : partialMap.entrySet()) {
+                Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
+                Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
+                myagg.container.put(key, value);
+            }
+
+            Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
+            int size = HiveUtils.getInt(sizeObj, sizeOI);
+            myagg.size = size;
+        }
+
+        @Override
+        public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+            if (myagg.size < myagg.container.size()) {
+                Object toKey = myagg.container.keySet().toArray()[myagg.size];
+                return ((SortedMap<Object, Object>) myagg.container).headMap(toKey);
+            }
+            return myagg.container;
+        }
+
+    }
+
+    public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
+
+        @Override
+        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+            myagg.container = new TreeMap<Object, Object>();
+            myagg.size = Integer.MAX_VALUE;
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
index db56b82..afa8a58 100644
--- a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
+++ b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
@@ -440,6 +440,13 @@ public final class HiveUtils {
         return PrimitiveObjectInspectorUtils.getDouble(o, oi);
     }
 
+    public static int getInt(@Nullable Object o, @Nonnull PrimitiveObjectInspector oi) {
+        if (o == null) {
+            return 0;
+        }
+        return PrimitiveObjectInspectorUtils.getInt(o, oi);
+    }
+
     @SuppressWarnings("unchecked")
     @Nullable
     public static <T extends Writable> T getConstValue(@Nonnull final ObjectInspector oi)

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java b/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java
deleted file mode 100644
index 3e3fc12..0000000
--- a/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.tools.array;
-
-import hivemall.utils.hadoop.WritableUtils;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SelectKBeatUDFTest {
-
-    @Test
-    public void test() throws Exception {
-        final SelectKBestUDF selectKBest = new SelectKBestUDF();
-        final int k = 2;
-        final double[] data = new double[] {250.29999999999998, 170.90000000000003, 73.2,
-                12.199999999999996};
-        final double[] importanceList = new double[] {292.1666753739119, 152.70000455081467,
-                187.93333893418327, 59.93333511948589};
-
-        final GenericUDF.DeferredObject[] dObjs = new GenericUDF.DeferredObject[] {
-                new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(data)),
-                new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(importanceList)),
-                new GenericUDF.DeferredJavaObject(k)};
-
-        selectKBest.initialize(new ObjectInspector[] {
-                ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
-                ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
-                ObjectInspectorUtils.getConstantObjectInspector(
-                    PrimitiveObjectInspectorFactory.javaIntObjectInspector, k)});
-        final List<DoubleWritable> resultObj = selectKBest.evaluate(dObjs);
-
-        Assert.assertEquals(resultObj.size(), k);
-
-        final double[] result = new double[k];
-        for (int i = 0; i < k; i++) {
-            result[i] = resultObj.get(i).get();
-        }
-
-        final double[] answer = new double[] {250.29999999999998, 73.2};
-
-        Assert.assertArrayEquals(answer, result, 0.d);
-        selectKBest.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java b/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java
new file mode 100644
index 0000000..15366a7
--- /dev/null
+++ b/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.array;
+
+import hivemall.utils.hadoop.WritableUtils;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SelectKBestUDFTest {
+
+    @Test
+    public void test() throws Exception {
+        final SelectKBestUDF selectKBest = new SelectKBestUDF();
+        final int k = 2;
+        final double[] data = new double[] {250.29999999999998, 170.90000000000003, 73.2,
+                12.199999999999996};
+        final double[] importanceList = new double[] {292.1666753739119, 152.70000455081467,
+                187.93333893418327, 59.93333511948589};
+
+        final GenericUDF.DeferredObject[] dObjs = new GenericUDF.DeferredObject[] {
+                new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(data)),
+                new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(importanceList)),
+                new GenericUDF.DeferredJavaObject(k)};
+
+        selectKBest.initialize(new ObjectInspector[] {
+                ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
+                ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaIntObjectInspector, k)});
+        final List<DoubleWritable> resultObj = selectKBest.evaluate(dObjs);
+
+        Assert.assertEquals(resultObj.size(), k);
+
+        final double[] result = new double[k];
+        for (int i = 0; i < k; i++) {
+            result[i] = resultObj.get(i).get();
+        }
+
+        final double[] answer = new double[] {250.29999999999998, 73.2};
+
+        Assert.assertArrayEquals(answer, result, 0.d);
+        selectKBest.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
new file mode 100644
index 0000000..f466dbc
--- /dev/null
+++ b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.list;
+
+import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator;
+import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator.QueueAggregationBuffer;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+@SuppressWarnings("deprecation")
+public class UDAFToOrderedListTest {
+
+    GenericUDAFEvaluator evaluator;
+    QueueAggregationBuffer agg;
+
+    @Before
+    public void setUp() throws Exception {
+        this.evaluator = new UDAFToOrderedListEvaluator();
+        this.agg = (QueueAggregationBuffer) evaluator.getNewAggregationBuffer();
+    }
+
+    @Test
+    public void testNaturalOrder() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {PrimitiveObjectInspectorFactory.javaDoubleObjectInspector};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(3, res.size());
+        Assert.assertEquals("apple", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+        Assert.assertEquals("candy", res.get(2));
+    }
+
+    @Test
+    public void testReverseOrder() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-reverse_order")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(3, res.size());
+        Assert.assertEquals("candy", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+        Assert.assertEquals("apple", res.get(2));
+    }
+
+    @Test
+    public void testTopK() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("candy", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+    @Test
+    public void testReverseTopK() throws Exception {
+        // = tail-k
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2 -reverse")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("apple", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+    @Test
+    public void testTailK() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("apple", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+    @Test
+    public void testReverseTailK() throws Exception {
+        // = top-k
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2 -reverse")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("candy", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+    @Test
+    public void testNaturalOrderWithKey() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        final double[] keys = new double[] {0.7, 0.5, 0.7};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(3, res.size());
+        Assert.assertEquals("apple", res.get(0));
+        if (res.get(1) == "banana") { // duplicated key (0.7)
+            Assert.assertEquals("candy", res.get(2));
+        } else {
+            Assert.assertEquals("banana", res.get(2));
+        }
+    }
+
+    @Test
+    public void testReverseOrderWithKey() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-reverse_order")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        final double[] keys = new double[] {0.7, 0.5, 0.7};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(3, res.size());
+        if (res.get(0) == "banana") { // duplicated key (0.7)
+            Assert.assertEquals("candy", res.get(1));
+        } else {
+            Assert.assertEquals("banana", res.get(1));
+        }
+        Assert.assertEquals("apple", res.get(2));
+    }
+
+    @Test
+    public void testTopKWithKey() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("candy", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+    @Test
+    public void testReverseTopKWithKey() throws Exception {
+        // = tail-k
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2 -reverse")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("apple", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+    @Test
+    public void testTailKWithKey() throws Exception {
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("apple", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+    @Test
+    public void testReverseTailKWithKey() throws Exception {
+        // = top-k
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2 -reverse")};
+
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < values.length; i++) {
+            evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+        }
+
+        List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+        Assert.assertEquals(2, res.size());
+        Assert.assertEquals("candy", res.get(0));
+        Assert.assertEquals("banana", res.get(1));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
new file mode 100644
index 0000000..9289a02
--- /dev/null
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.map;
+
+import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.SortedMap;
+
+@SuppressWarnings("deprecation")
+public class UDAFToOrderedMapTest {
+
+    @Test
+    public void testNaturalOrder() throws Exception {
+        GenericUDAFEvaluator evaluator = new NaturalOrderedMapEvaluator();
+        NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector};
+
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < keys.length; i++) {
+            evaluator.iterate(agg, new Object[] {keys[i], values[i]});
+        }
+
+        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Object[] sortedValues = res.values().toArray();
+
+        Assert.assertEquals(3, sortedValues.length);
+        Assert.assertEquals("apple", sortedValues[0]);
+        Assert.assertEquals("banana", sortedValues[1]);
+        Assert.assertEquals("candy", sortedValues[2]);
+    }
+
+    @Test
+    public void testReverseOrder() throws Exception {
+        GenericUDAFEvaluator evaluator = new ReverseOrderedMapEvaluator();
+        ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaBooleanObjectInspector};
+
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+        final String[] values = new String[] {"banana", "apple", "candy"};
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < keys.length; i++) {
+            evaluator.iterate(agg, new Object[] {keys[i], values[i]});
+        }
+
+        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Object[] sortedValues = res.values().toArray();
+
+        Assert.assertEquals(3, sortedValues.length);
+        Assert.assertEquals("candy", sortedValues[0]);
+        Assert.assertEquals("banana", sortedValues[1]);
+        Assert.assertEquals("apple", sortedValues[2]);
+    }
+
+    @Test
+    public void testTopK() throws Exception {
+        GenericUDAFEvaluator evaluator = new TopKOrderedMapEvaluator();
+        TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaIntObjectInspector};
+
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        int size = 2;
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < keys.length; i++) {
+            evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
+        }
+
+        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Object[] sortedValues = res.values().toArray();
+
+        Assert.assertEquals(size, sortedValues.length);
+        Assert.assertEquals("candy", sortedValues[0]);
+        Assert.assertEquals("banana", sortedValues[1]);
+    }
+
+    @Test
+    public void testTailK() throws Exception {
+        GenericUDAFEvaluator evaluator = new TailKOrderedMapEvaluator();
+        TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+        ObjectInspector[] inputOIs = new ObjectInspector[] {
+                PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                PrimitiveObjectInspectorFactory.javaIntObjectInspector};
+
+        final double[] keys = new double[] {0.7, 0.5, 0.8};
+        final String[] values = new String[] {"banana", "apple", "candy"};
+        int size = -2;
+
+        evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+        evaluator.reset(agg);
+
+        for (int i = 0; i < keys.length; i++) {
+            evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
+        }
+
+        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Object[] sortedValues = res.values().toArray();
+
+        Assert.assertEquals(Math.abs(size), sortedValues.length);
+        Assert.assertEquals("apple", sortedValues[0]);
+        Assert.assertEquals("banana", sortedValues[1]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/eval/rank.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/eval/rank.md b/docs/gitbook/eval/rank.md
index ed1a44c..db681ac 100644
--- a/docs/gitbook/eval/rank.md
+++ b/docs/gitbook/eval/rank.md
@@ -28,6 +28,11 @@ Practical machine learning applications such as information retrieval and recomm
 
 This page focuses on evaluation of the results from such ranking problems.
 
+> #### Caution
+> In order to obtain ranked list of items, this page introduces queries using `to_ordered_map()` such as `map_values(to_ordered_map(score, itemid, true))`. However, this kind of usage has a potential issue that multiple `itemid`-s (i.e., values) which have the exactly same `score` (i.e., key) will be aggregated to single arbitrary `itemid`, because `to_ordered_map()` creates a key-value map which uses duplicated `score` as key.
+>
+> Hence, if map key could duplicate on more then one map values, we recommend you to use `to_ordered_list(value, key, '-reverse')` instead of `map_values(to_ordered_map(key, value, true))`. The alternative approach is available from Hivemall v0.5-rc.1 or later.
+
 # Binary Response Measures
 
 In a context of ranking problem, **binary response** means that binary labels are assigned to items, and positive items are considered as *truth* observations.

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/misc/generic_funcs.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/generic_funcs.md b/docs/gitbook/misc/generic_funcs.md
index b3a0421..b27117f 100644
--- a/docs/gitbook/misc/generic_funcs.md
+++ b/docs/gitbook/misc/generic_funcs.md
@@ -83,6 +83,10 @@ This page describes a list of useful Hivemall generic functions.
 
 - `array_sum(array<NUMBER>)` - Returns an array<double> in which each element is summed up
 
+## List UDAF
+
+- `to_ordered_list(value [, const string options])` or `to_ordered_list(value, key [, const string options])` - Return list of values sorted by value itself or specific key
+
 # Bitset functions
 
 ## Bitset UDF
@@ -141,8 +145,7 @@ The compression level must be in range [-1,9]
 
 - `to_map(key, value)` - Convert two aggregated columns into a key-value map
 
-- `to_ordered_map(key, value [, const boolean reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
-
+- `to_ordered_map(key, value [, const int|boolean k|reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
 
 # MapReduce functions
 

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/misc/topk.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/topk.md b/docs/gitbook/misc/topk.md
index 6a80514..27cf7ad 100644
--- a/docs/gitbook/misc/topk.md
+++ b/docs/gitbook/misc/topk.md
@@ -379,3 +379,66 @@ FROM
 | 4  | 0.4432108402252197  | 3 | 26220 | 1 |
 | 5  | 0.44323229789733887 | 3 | 18541 | 0 |
 | .. | .. | .. | .. | .. |
+
+# Alternative approaches
+
+In order to utilize mapper-side aggregation and reduce computational cost of shuffling, you can use [`to_ordered_map`](./generic_funcs.md#map-udafs) or [`to_ordered_list`](./generic_funcs.md#list-udaf) to get top/tail-k elements instead of `each_top_k`.
+
+As long as `key` is unique in each `id`, the following queries return same result:
+
+```sql
+with t as (
+  select
+    each_top_k(
+      10, id, key,
+      id, value
+    ) as (rank, key, id, value)
+  from (
+    select
+      *
+    from 
+      test
+    cluster by 
+      id
+  ) t
+)
+select 
+  id, collect_list(value) as topk
+from 
+  t
+group by
+  id
+```
+
+```sql
+with t as (
+  select
+    id, to_ordered_map(key, value, 10) as m
+  from 
+    test
+  group by
+    id
+)
+select 
+  id, collect_list(value) as topk
+from 
+  t
+lateral view explode(m) t as key, value
+group by
+  id
+```
+
+```sql
+select 
+  id, to_ordered_list(value, key, '-k 10') as topk
+from 
+  test
+group by
+  id
+```
+
+> #### Caution
+>
+> In case that `key` could duplicate in `id`, `to_ordered_map` behaves differently because key `K` is always unique in `Map<K, V>`.
+
+Similarly to `each_top_k`, tail-k can also be represented as: `to_ordered_map(key, value, -10)` and `to_ordered_list(value, key, '-k -10')`.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/recommend/item_based_cf.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/recommend/item_based_cf.md b/docs/gitbook/recommend/item_based_cf.md
index 9e4f7e4..053b225 100644
--- a/docs/gitbook/recommend/item_based_cf.md
+++ b/docs/gitbook/recommend/item_based_cf.md
@@ -437,6 +437,11 @@ from (
 
 In order to generate a list of recommended items, you can use either cooccurrence count or similarity as a relevance score.
 
+> #### Caution
+> In order to obtain ranked list of items, this section introduces queries using `map_values(to_ordered_map(rank, rec_item))`. However, this kind of usage has a potential issue that multiple `rec_item`-s which have the exactly same `rank` will be aggregated to single arbitrary `rec_item`, because `to_ordered_map()` creates a key-value map which uses duplicated `rank` as key.
+>
+> Since such situation is possible in case that `each_top_k()` is executed for different `userid`-s who have the same `cnt` or `similarity`, we recommend you to use `to_ordered_list(rec_item, rank, '-reverse')` instead of `map_values(to_ordered_map(rank, rec_item, true))`. The alternative approach is available from Hivemall v0.5-rc.1 or later.
+
 ### Cooccurrence-based
 
 ```sql

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/recommend/movielens_cf.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/recommend/movielens_cf.md b/docs/gitbook/recommend/movielens_cf.md
index faa555c..08268a8 100644
--- a/docs/gitbook/recommend/movielens_cf.md
+++ b/docs/gitbook/recommend/movielens_cf.md
@@ -21,6 +21,11 @@
 
 <!-- toc -->
 
+> #### Caution
+> In order to obtain ranked list of items, this page introduces queries using `to_ordered_map()` such as `map_values(to_ordered_map(rating, movieid, true))`. However, this kind of usage has a potential issue that multiple `movieid`-s (i.e., values) which have the exactly same `rating` (i.e., key) will be aggregated to single arbitrary `movieid`, because `to_ordered_map()` creates a key-value map which uses duplicated `rating` as key.
+>
+> Hence, if map key could duplicate on more then one map values, we recommend you to use `to_ordered_list(value, key, '-reverse')` instead of `map_values(to_ordered_map(key, value, true))`. The alternative approach is available from Hivemall v0.5-rc.1 or later.
+
 # Compute movie-movie similarity
 
 [As we explained in the general introduction of item-based CF](item_based_cf.html#dimsum-approximated-all-pairs-cosine-similarity-computation.md), following query finds top-$$k$$ nearest-neighborhood movies for each movie:

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-all-as-permanent.hive
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all-as-permanent.hive b/resources/ddl/define-all-as-permanent.hive
index c2b38fb..cda33f9 100644
--- a/resources/ddl/define-all-as-permanent.hive
+++ b/resources/ddl/define-all-as-permanent.hive
@@ -467,6 +467,13 @@ DROP FUNCTION IF EXISTS to_ordered_map;
 CREATE FUNCTION to_ordered_map as 'hivemall.tools.map.UDAFToOrderedMap' USING JAR '${hivemall_jar}';
 
 ---------------------
+-- list functions --
+---------------------
+
+DROP FUNCTION IF EXISTS to_ordered_list;
+CREATE FUNCTION to_ordered_list as 'hivemall.tools.list.UDAFToOrderedList' USING JAR '${hivemall_jar}';
+
+---------------------
 -- Math functions --
 ---------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-all.hive
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all.hive b/resources/ddl/define-all.hive
index 89821f8..6e116ac 100644
--- a/resources/ddl/define-all.hive
+++ b/resources/ddl/define-all.hive
@@ -459,6 +459,13 @@ drop temporary function if exists to_ordered_map;
 create temporary function to_ordered_map as 'hivemall.tools.map.UDAFToOrderedMap';
 
 ---------------------
+-- list functions --
+---------------------
+
+drop temporary function if exists to_ordered_list;
+create temporary function to_ordered_list as 'hivemall.tools.list.UDAFToOrderedList';
+
+---------------------
 -- Math functions --
 ---------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-all.spark
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all.spark b/resources/ddl/define-all.spark
index b4926e3..d3eb3cd 100644
--- a/resources/ddl/define-all.spark
+++ b/resources/ddl/define-all.spark
@@ -458,6 +458,13 @@ sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS to_ordered_map")
 sqlContext.sql("CREATE TEMPORARY FUNCTION to_ordered_map AS 'hivemall.tools.map.UDAFToOrderedMap'")
 
 /**
+ * List functions
+ */
+
+sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS to_ordered_list")
+sqlContext.sql("CREATE TEMPORARY FUNCTION to_ordered_list AS 'hivemall.tools.list.UDAFToOrderedList'")
+
+/**
  * Math functions
  */
 

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-udfs.td.hql
----------------------------------------------------------------------
diff --git a/resources/ddl/define-udfs.td.hql b/resources/ddl/define-udfs.td.hql
index c7fdd49..2662260 100644
--- a/resources/ddl/define-udfs.td.hql
+++ b/resources/ddl/define-udfs.td.hql
@@ -177,6 +177,7 @@ create temporary function tree_export as 'hivemall.smile.tools.TreeExportUDF';
 create temporary function train_ffm as 'hivemall.fm.FieldAwareFactorizationMachineUDTF';
 create temporary function ffm_predict as 'hivemall.fm.FFMPredictGenericUDAF';
 create temporary function add_field_indicies as 'hivemall.ftvec.trans.AddFieldIndicesUDF';
+create temporary function to_ordered_list as 'hivemall.tools.list.UDAFToOrderedList';
 
 -- NLP features
 create temporary function tokenize_ja as 'hivemall.nlp.tokenizer.KuromojiUDF';


[2/5] incubator-hivemall git commit: Fixed to_ordered_map description

Posted by my...@apache.org.
Fixed to_ordered_map description


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/44528a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/44528a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/44528a83

Branch: refs/heads/master
Commit: 44528a83b065aa50255c751fb37041d1adbeb558
Parents: 9cd3c59
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:22:59 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:22:59 2017 +0900

----------------------------------------------------------------------
 docs/gitbook/misc/generic_funcs.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/44528a83/docs/gitbook/misc/generic_funcs.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/generic_funcs.md b/docs/gitbook/misc/generic_funcs.md
index b27117f..66b30e2 100644
--- a/docs/gitbook/misc/generic_funcs.md
+++ b/docs/gitbook/misc/generic_funcs.md
@@ -145,7 +145,7 @@ The compression level must be in range [-1,9]
 
 - `to_map(key, value)` - Convert two aggregated columns into a key-value map
 
-- `to_ordered_map(key, value [, const int|boolean k|reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
+- `to_ordered_map(key, value [, const int k|const boolean reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
 
 # MapReduce functions
 


[4/5] incubator-hivemall git commit: Refactored to_ordered_list and to_ordered_map UDAF

Posted by my...@apache.org.
Refactored to_ordered_list and to_ordered_map UDAF


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/69730f65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/69730f65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/69730f65

Branch: refs/heads/master
Commit: 69730f65d76b54890b141ee13567eb681c0374ae
Parents: 21a8781
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:26:49 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:27:23 2017 +0900

----------------------------------------------------------------------
 .../hivemall/tools/list/UDAFToOrderedList.java  | 116 +++++-----
 .../hivemall/tools/map/UDAFToOrderedMap.java    | 221 +++----------------
 .../tools/list/UDAFToOrderedListTest.java       |  32 ++-
 .../tools/map/UDAFToOrderedMapTest.java         |  40 ++--
 4 files changed, 132 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
index 16c966a..e88a16c 100644
--- a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
+++ b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
@@ -21,6 +21,23 @@ package hivemall.tools.list;
 import hivemall.utils.collections.BoundedPriorityQueue;
 import hivemall.utils.hadoop.HiveUtils;
 import hivemall.utils.lang.CommandLineUtils;
+import hivemall.utils.lang.NaturalComparator;
+import hivemall.utils.lang.Preconditions;
+import hivemall.utils.struct.Pair;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -33,25 +50,26 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.IntWritable;
 
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.*;
-
 /**
  * Return list of values sorted by value itself or specific key.
  */
-@Description(
-        name = "to_ordered_list",
-        value = "_FUNC_(value [, key, const string options]) - Return list of values sorted by value itself or specific key")
-public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
+@Description(name = "to_ordered_list",
+        value = "_FUNC_(PRIMITIVE value [, PRIMITIVE key, const string options])"
+                + " - Return list of values sorted by value itself or specific key")
+public final class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
     @Override
     public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
@@ -151,7 +169,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
             int k = 0;
             boolean reverseOrder = false;
-
             if (argOIs.length >= optionIndex + 1) {
                 String rawArgs = HiveUtils.getConstString(argOIs[optionIndex]);
                 cl = parseOptions(rawArgs);
@@ -161,18 +178,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 if (cl.hasOption("k")) {
                     k = Integer.parseInt(cl.getOptionValue("k"));
                     if (k == 0) {
-                        throw new UDFArgumentException("`k` must be nonzero: " + k);
+                        throw new UDFArgumentException("`k` must be non-zero value: " + k);
                     }
                 }
             }
-
             this.size = Math.abs(k);
 
-            if ((k > 0 && reverseOrder) || (k < 0 && !reverseOrder) || (k == 0 && !reverseOrder)) {
-                // reverse top-k, natural tail-k = ascending = natural order output = reverse order priority queue
+            if ((k > 0 && reverseOrder) || (k < 0 && reverseOrder == false)
+                    || (k == 0 && reverseOrder == false)) {
+                // top-k on reverse order = tail-k on natural order (so, top-k on descending)
                 this.reverseOrder = true;
-            } else { // (k > 0 && !reverseOrder) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
-                // natural top-k or reverse tail-k = descending = reverse order output = natural order priority queue
+            } else { // (k > 0 && reverseOrder == false) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
+                // top-k on natural order = tail-k on reverse order (so, top-k on ascending)
                 this.reverseOrder = false;
             }
 
@@ -190,7 +207,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                         || (argOIs.length == 3 && HiveUtils.isConstString(argOIs[2]));
 
                 if (sortByKey) {
-                    this.valueOI = argOIs[0];
+                    this.valueOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
                     this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[1]);
                 } else {
                     // sort values by value itself
@@ -230,20 +247,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
             return outputOI;
         }
 
+        @Nonnull
         private static StructObjectInspector internalMergeOI(@Nonnull ObjectInspector valueOI,
                 @Nonnull PrimitiveObjectInspector keyOI) {
-            ArrayList<String> fieldNames = new ArrayList<String>();
-            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+            List<String> fieldNames = new ArrayList<String>();
+            List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
 
             fieldNames.add("valueList");
             fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
             fieldNames.add("keyList");
             fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(keyOI)));
-
             fieldNames.add("size");
             fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
             fieldNames.add("reverseOrder");
             fieldOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
 
@@ -295,10 +310,10 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 throws HiveException {
             QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
 
-            Map<String, List<Object>> tuples = myagg.drainQueue();
-            List<Object> valueList = tuples.get("value");
-            List<Object> keyList = tuples.get("key");
-            if (valueList.size() == 0) {
+            Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+            List<Object> keyList = tuples.getKey();
+            List<Object> valueList = tuples.getValue();
+            if (valueList.isEmpty()) {
                 return null;
             }
 
@@ -307,7 +322,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
             partialResult[1] = keyList;
             partialResult[2] = new IntWritable(myagg.size);
             partialResult[3] = new BooleanWritable(myagg.reverseOrder);
-
             return partialResult;
         }
 
@@ -345,11 +359,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
         }
 
         @Override
-        public Object terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+        public List<Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
             QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
-            Map<String, List<Object>> tuples = myagg.drainQueue();
-            return tuples.get("value");
+            Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+            return tuples.getValue();
         }
 
         static class QueueAggregationBuffer extends AbstractAggregationBuffer {
@@ -374,14 +388,14 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 this.reverseOrder = reverseOrder;
             }
 
-            void iterate(TupleWithKey tuple) {
+            void iterate(@Nonnull TupleWithKey tuple) {
                 if (queueHandler == null) {
                     initQueueHandler();
                 }
                 queueHandler.offer(tuple);
             }
 
-            void merge(List<Object> o_keyList, List<Object> o_valueList) {
+            void merge(@Nonnull List<Object> o_keyList, @Nonnull List<Object> o_valueList) {
                 if (queueHandler == null) {
                     initQueueHandler();
                 }
@@ -391,7 +405,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
             }
 
             @Nonnull
-            Map<String, List<Object>> drainQueue() {
+            Pair<List<Object>, List<Object>> drainQueue() {
                 int n = queueHandler.size();
                 final Object[] keys = new Object[n];
                 final Object[] values = new Object[n];
@@ -402,10 +416,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 }
                 queueHandler.clear();
 
-                Map<String, List<Object>> res = new HashMap<String, List<Object>>();
-                res.put("key", Arrays.asList(keys));
-                res.put("value", Arrays.asList(values));
-                return res;
+                return Pair.of(Arrays.asList(keys), Arrays.asList(values));
             }
 
             private void initQueueHandler() {
@@ -413,12 +424,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
                 if (reverseOrder) {
                     comparator = Collections.reverseOrder();
                 } else {
-                    comparator = new Comparator<TupleWithKey>() {
-                        @Override
-                        public int compare(TupleWithKey o1, TupleWithKey o2) {
-                            return o1.compareTo(o2);
-                        }
-                    };
+                    comparator = NaturalComparator.getInstance();
                 }
 
                 if (size > 0) {
@@ -436,10 +442,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
          */
         private static abstract class AbstractQueueHandler {
 
-            abstract void offer(TupleWithKey tuple);
+            abstract void offer(@Nonnull TupleWithKey tuple);
 
             abstract int size();
 
+            @Nullable
             abstract TupleWithKey poll();
 
             abstract void clear();
@@ -450,6 +457,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
             private static final int DEFAULT_INITIAL_CAPACITY = 11; // same as PriorityQueue
 
+            @Nonnull
             private final PriorityQueue<TupleWithKey> queue;
 
             QueueHandler(@Nonnull Comparator<TupleWithKey> comparator) {
@@ -480,6 +488,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
 
         private static final class BoundedQueueHandler extends AbstractQueueHandler {
 
+            @Nonnull
             private final BoundedPriorityQueue<TupleWithKey> queue;
 
             BoundedQueueHandler(int size, @Nonnull Comparator<TupleWithKey> comparator) {
@@ -509,24 +518,29 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
         }
 
         private static final class TupleWithKey implements Comparable<TupleWithKey> {
-            private Object key;
-            private Object value;
+            @Nonnull
+            private final Object key;
+            @Nonnull
+            private final Object value;
 
-            TupleWithKey(Object key, Object value) {
-                this.key = key;
-                this.value = value;
+            TupleWithKey(@CheckForNull Object key, @CheckForNull Object value) {
+                this.key = Preconditions.checkNotNull(key);
+                this.value = Preconditions.checkNotNull(value);
             }
 
+            @Nonnull
             Object getKey() {
                 return key;
             }
 
+            @Nonnull
             Object getValue() {
                 return value;
             }
 
             @Override
             public int compareTo(TupleWithKey o) {
+                @SuppressWarnings("unchecked")
                 Comparable<? super Object> k = (Comparable<? super Object>) key;
                 return k.compareTo(o.getKey());
             }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index 3e6caa4..a6b547f 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -18,14 +18,14 @@
  */
 package hivemall.tools.map;
 
+import hivemall.utils.collections.maps.BoundedSortedMap;
 import hivemall.utils.hadoop.HiveUtils;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Map;
-import java.util.SortedMap;
 import java.util.TreeMap;
 
+import javax.annotation.Nonnegative;
+
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
@@ -34,17 +34,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.IntWritable;
-
-import javax.annotation.Nonnull;
 
 /**
  * Convert two aggregated columns into a sorted key-value map.
@@ -52,13 +42,13 @@ import javax.annotation.Nonnull;
 @Description(name = "to_ordered_map",
         value = "_FUNC_(key, value [, const int k|const boolean reverseOrder=false]) "
                 + "- Convert two aggregated columns into an ordered key-value map")
-public class UDAFToOrderedMap extends UDAFToMap {
+public final class UDAFToOrderedMap extends UDAFToMap {
 
     @Override
     public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
             throws SemanticException {
         @SuppressWarnings("deprecation")
-        TypeInfo[] typeInfo = info.getParameters();
+        final TypeInfo[] typeInfo = info.getParameters();
         if (typeInfo.length != 2 && typeInfo.length != 3) {
             throw new UDFArgumentTypeException(typeInfo.length - 1,
                 "Expecting two or three arguments: " + typeInfo.length);
@@ -81,6 +71,7 @@ public class UDAFToOrderedMap extends UDAFToMap {
                     throw new UDFArgumentException("Map size must be nonzero: " + size);
                 }
                 reverseOrder = (size > 0); // positive size => top-k
+                size = Math.abs(size);
             } else {
                 throw new UDFArgumentTypeException(2,
                     "The third argument must be boolean or integer type: "
@@ -89,205 +80,53 @@ public class UDAFToOrderedMap extends UDAFToMap {
         }
 
         if (reverseOrder) { // descending
-            if (size != 0) {
-                return new TopKOrderedMapEvaluator();
-            }
-            return new ReverseOrderedMapEvaluator();
+            return new DescendingMapEvaluator(size);
         } else { // ascending
-            if (size != 0) {
-                return new TailKOrderedMapEvaluator();
-            }
-            return new NaturalOrderedMapEvaluator();
+            return new AscendingMapEvaluator(size);
         }
     }
 
-    public static class NaturalOrderedMapEvaluator extends UDAFToMapEvaluator {
-
-        @Override
-        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
-        }
-
-    }
+    public static final class AscendingMapEvaluator extends UDAFToMapEvaluator {
 
-    public static class ReverseOrderedMapEvaluator extends UDAFToMapEvaluator {
+        private final int size;
 
-        @Override
-        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
-                Collections.reverseOrder());
-        }
-
-    }
-
-    public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
-
-        protected PrimitiveObjectInspector inputKeyOI;
-        protected ObjectInspector inputValueOI;
-        protected StandardMapObjectInspector partialMapOI;
-        protected PrimitiveObjectInspector sizeOI;
-
-        protected StructObjectInspector internalMergeOI;
-
-        protected StructField partialMapField;
-        protected StructField sizeField;
-
-        @Override
-        public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
-            super.init(mode, argOIs);
-
-            // initialize input
-            if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
-                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
-                this.inputValueOI = argOIs[1];
-                this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
-            } else {// from partial aggregation
-                StructObjectInspector soi = (StructObjectInspector) argOIs[0];
-                this.internalMergeOI = soi;
-
-                this.partialMapField = soi.getStructFieldRef("partialMap");
-                // re-extract input key/value OIs
-                StandardMapObjectInspector partialMapOI = (StandardMapObjectInspector) partialMapField.getFieldObjectInspector();
-                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
-                this.inputValueOI = partialMapOI.getMapValueObjectInspector();
-
-                this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
-                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
-                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
-
-                this.sizeField = soi.getStructFieldRef("size");
-                this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
-            }
-
-            // initialize output
-            final ObjectInspector outputOI;
-            if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
-                outputOI = internalMergeOI(inputKeyOI, inputValueOI);
-            } else {// terminate
-                outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
-                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
-                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
-            }
-            return outputOI;
-        }
-
-        private static StructObjectInspector internalMergeOI(
-                @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
-            ArrayList<String> fieldNames = new ArrayList<String>();
-            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-
-            fieldNames.add("partialMap");
-            fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
-                ObjectInspectorUtils.getStandardObjectInspector(keyOI),
-                ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
-            fieldNames.add("size");
-            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
-            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
-        }
-
-        static class MapAggregationBuffer extends AbstractAggregationBuffer {
-            Map<Object, Object> container;
-            int size;
-
-            MapAggregationBuffer() {
-                super();
-            }
+        AscendingMapEvaluator(@Nonnegative int size) {
+            super();
+            this.size = size;
         }
 
         @Override
         public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            myagg.container = new TreeMap<Object, Object>(Collections.reverseOrder());
-            myagg.size = Integer.MAX_VALUE;
-        }
-
-        @Override
-        public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
-            MapAggregationBuffer myagg = new MapAggregationBuffer();
-            reset(myagg);
-            return myagg;
-        }
-
-        @Override
-        public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
-                Object[] parameters) throws HiveException {
-            assert (parameters.length == 3);
-
-            if (parameters[0] == null) {
-                return;
+            if (size == 0) {
+                ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
+            } else {
+                ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size);
             }
-
-            Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
-            Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
-            int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
-
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            myagg.container.put(key, value);
-            myagg.size = size;
         }
 
-        @Override
-        public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-
-            Object[] partialResult = new Object[2];
-            partialResult[0] = myagg.container;
-            partialResult[1] = new IntWritable(myagg.size);
-
-            return partialResult;
-        }
+    }
 
-        @Override
-        public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
-                throws HiveException {
-            if (partial == null) {
-                return;
-            }
+    public static final class DescendingMapEvaluator extends UDAFToMapEvaluator {
 
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+        private final int size;
 
-            Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
-            Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
-            for (Map.Entry<?, ?> e : partialMap.entrySet()) {
-                Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
-                Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
-                myagg.container.put(key, value);
-            }
-
-            Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
-            int size = HiveUtils.getInt(sizeObj, sizeOI);
-            myagg.size = size;
+        DescendingMapEvaluator(int size) {
+            super();
+            this.size = size;
         }
 
         @Override
-        public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
-                throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            if (myagg.size < myagg.container.size()) {
-                Object toKey = myagg.container.keySet().toArray()[myagg.size];
-                return ((SortedMap<Object, Object>) myagg.container).headMap(toKey);
-            }
-            return myagg.container;
-        }
-
-    }
-
-    public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
-
-        @Override
         public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
-            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-            myagg.container = new TreeMap<Object, Object>();
-            myagg.size = Integer.MAX_VALUE;
+            if (size == 0) {
+                ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
+                    Collections.reverseOrder());
+            } else {
+                ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size,
+                    true);
+            }
         }
 
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
index f466dbc..c3039d1 100644
--- a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
+++ b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
@@ -21,22 +21,20 @@ package hivemall.tools.list;
 import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator;
 import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator.QueueAggregationBuffer;
 
+import java.util.List;
+
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.List;
-
-@SuppressWarnings("deprecation")
 public class UDAFToOrderedListTest {
 
-    GenericUDAFEvaluator evaluator;
-    QueueAggregationBuffer agg;
+    private UDAFToOrderedListEvaluator evaluator;
+    private QueueAggregationBuffer agg;
 
     @Before
     public void setUp() throws Exception {
@@ -57,7 +55,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(3, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -81,7 +79,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(3, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -105,7 +103,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -129,7 +127,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -152,7 +150,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -176,7 +174,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -199,7 +197,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(3, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -257,7 +255,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));
@@ -283,7 +281,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -308,7 +306,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("apple", res.get(0));
@@ -334,7 +332,7 @@ public class UDAFToOrderedListTest {
             evaluator.iterate(agg, new Object[] {values[i], keys[i]});
         }
 
-        List<Object> res = (List<Object>) evaluator.terminate(agg);
+        List<Object> res = evaluator.terminate(agg);
 
         Assert.assertEquals(2, res.size());
         Assert.assertEquals("candy", res.get(0));

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
index 9289a02..61642f1 100644
--- a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -18,27 +18,23 @@
  */
 package hivemall.tools.map;
 
-import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.AscendingMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.DescendingMapEvaluator;
+
+import java.util.SortedMap;
 
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.SortedMap;
-
-@SuppressWarnings("deprecation")
 public class UDAFToOrderedMapTest {
 
     @Test
     public void testNaturalOrder() throws Exception {
-        GenericUDAFEvaluator evaluator = new NaturalOrderedMapEvaluator();
-        NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        AscendingMapEvaluator evaluator = new AscendingMapEvaluator(0);
+        AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -61,12 +57,14 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals("apple", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
         Assert.assertEquals("candy", sortedValues[2]);
+
+        evaluator.close();
     }
 
     @Test
     public void testReverseOrder() throws Exception {
-        GenericUDAFEvaluator evaluator = new ReverseOrderedMapEvaluator();
-        ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        DescendingMapEvaluator evaluator = new DescendingMapEvaluator(0);
+        DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -90,12 +88,15 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals("candy", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
         Assert.assertEquals("apple", sortedValues[2]);
+
+        evaluator.close();
     }
 
     @Test
     public void testTopK() throws Exception {
-        GenericUDAFEvaluator evaluator = new TopKOrderedMapEvaluator();
-        TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        int size = 2;
+        DescendingMapEvaluator evaluator = new DescendingMapEvaluator(size);
+        DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -104,7 +105,6 @@ public class UDAFToOrderedMapTest {
 
         final double[] keys = new double[] {0.7, 0.5, 0.8};
         final String[] values = new String[] {"banana", "apple", "candy"};
-        int size = 2;
 
         evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
         evaluator.reset(agg);
@@ -119,12 +119,15 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals(size, sortedValues.length);
         Assert.assertEquals("candy", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
+
+        evaluator.close();
     }
 
     @Test
     public void testTailK() throws Exception {
-        GenericUDAFEvaluator evaluator = new TailKOrderedMapEvaluator();
-        TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        int size = -2;
+        AscendingMapEvaluator evaluator = new AscendingMapEvaluator(Math.abs(size));
+        AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -133,7 +136,6 @@ public class UDAFToOrderedMapTest {
 
         final double[] keys = new double[] {0.7, 0.5, 0.8};
         final String[] values = new String[] {"banana", "apple", "candy"};
-        int size = -2;
 
         evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
         evaluator.reset(agg);
@@ -148,6 +150,8 @@ public class UDAFToOrderedMapTest {
         Assert.assertEquals(Math.abs(size), sortedValues.length);
         Assert.assertEquals("apple", sortedValues[0]);
         Assert.assertEquals("banana", sortedValues[1]);
+
+        evaluator.close();
     }
 
 }



[3/5] incubator-hivemall git commit: Added utility classes and methods

Posted by my...@apache.org.
Added utility classes and methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/21a87814
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/21a87814
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/21a87814

Branch: refs/heads/master
Commit: 21a87814895c9c63479a049387dd53b34cc5c56e
Parents: 44528a8
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:24:25 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:24:25 2017 +0900

----------------------------------------------------------------------
 .../collections/maps/BoundedSortedMap.java      |  59 ++++++++++
 .../hivemall/utils/lang/NaturalComparator.java  |  48 ++++++++
 .../java/hivemall/utils/lang/StringUtils.java   |  17 +++
 .../main/java/hivemall/utils/struct/Pair.java   |  38 +++++++
 .../collections/BoundedPriorityQueueTest.java   | 114 +++++++++++++++++++
 .../collections/maps/BoundedSortedMapTest.java  |  84 ++++++++++++++
 6 files changed, 360 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java b/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java
new file mode 100644
index 0000000..b1bf806
--- /dev/null
+++ b/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.collections.maps;
+
+import hivemall.utils.lang.Preconditions;
+
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nullable;
+
+public final class BoundedSortedMap<K, V> extends TreeMap<K, V> {
+    private static final long serialVersionUID = 4580890152997313541L;
+
+    private final int bound;
+
+    public BoundedSortedMap(@Nonnegative int size) {
+        this(size, false);
+    }
+
+    public BoundedSortedMap(@Nonnegative int size, boolean reverseOrder) {
+        super(reverseOrder ? Collections.reverseOrder() : null);
+        Preconditions.checkArgument(size > 0, "size must be greater than zero: " + size);
+        this.bound = size;
+    }
+
+    @Nullable
+    public V put(@CheckForNull final K key, @Nullable final V value) {
+        final V old = super.put(key, value);
+        if (size() > bound) {
+            Entry<K, V> e = pollLastEntry();
+            if (e == null) {
+                return null;
+            }
+            return e.getValue();
+        }
+        return old;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/lang/NaturalComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/lang/NaturalComparator.java b/core/src/main/java/hivemall/utils/lang/NaturalComparator.java
new file mode 100644
index 0000000..d451f1b
--- /dev/null
+++ b/core/src/main/java/hivemall/utils/lang/NaturalComparator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.lang;
+
+import java.util.Comparator;
+
+import javax.annotation.Nonnull;
+
+public final class NaturalComparator<T extends Comparable<? super T>> implements Comparator<T> {
+
+    @SuppressWarnings("rawtypes")
+    private final static NaturalComparator INSTANCE = new NaturalComparator();
+
+    private NaturalComparator() {}// avoid instantiation
+
+    @Override
+    public int compare(T o1, T o2) {
+        return o1.compareTo(o2);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nonnull
+    public final static <T extends Comparable<? super T>> Comparator<T> getInstance() {
+        return (Comparator<T>) INSTANCE;
+    }
+
+    @Nonnull
+    public final static <T extends Comparable<? super T>> Comparator<T> newInstance() {
+        return new NaturalComparator<T>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/lang/StringUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/lang/StringUtils.java b/core/src/main/java/hivemall/utils/lang/StringUtils.java
index 48e137f..5b66dd1 100644
--- a/core/src/main/java/hivemall/utils/lang/StringUtils.java
+++ b/core/src/main/java/hivemall/utils/lang/StringUtils.java
@@ -250,5 +250,22 @@ public final class StringUtils {
         return builder.toString();
     }
 
+    public static int compare(@Nullable final String o1, @Nullable final String o2) {
+        return compare(o1, o2, true);
+    }
+
+    public static int compare(@Nullable final String o1, @Nullable final String o2,
+            final boolean nullIsLess) {
+        if (o1 == o2) {
+            return 0;
+        }
+        if (o1 == null) {
+            return nullIsLess ? -1 : 1;
+        }
+        if (o2 == null) {
+            return nullIsLess ? 1 : -1;
+        }
+        return o1.compareTo(o2);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/struct/Pair.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/struct/Pair.java b/core/src/main/java/hivemall/utils/struct/Pair.java
new file mode 100644
index 0000000..17737ed
--- /dev/null
+++ b/core/src/main/java/hivemall/utils/struct/Pair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.struct;
+
+import java.util.AbstractMap;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+public class Pair<K, V> extends AbstractMap.SimpleEntry<K, V> {
+    private static final long serialVersionUID = 6411527075103472113L;
+
+    public Pair(@Nullable K key, @Nullable V value) {
+        super(key, value);
+    }
+
+    @Nonnull
+    public static <K, V> Pair<K, V> of(@Nullable K key, @Nullable V value) {
+        return new Pair<>(key, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java b/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java
new file mode 100644
index 0000000..1220d76
--- /dev/null
+++ b/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.collections;
+
+import hivemall.utils.lang.NaturalComparator;
+import hivemall.utils.lang.StringUtils;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BoundedPriorityQueueTest {
+
+    @Test
+    public void testTop3() {
+        BoundedPriorityQueue<Integer> queue = new BoundedPriorityQueue<Integer>(3,
+            new Comparator<Integer>() {
+                @Override
+                public int compare(Integer o1, Integer o2) {
+                    return Integer.compare(o1, o2);
+                }
+            });
+        Assert.assertTrue(queue.offer(1));
+        Assert.assertTrue(queue.offer(4));
+        Assert.assertTrue(queue.offer(3));
+        Assert.assertTrue(queue.offer(2));
+        Assert.assertFalse(queue.offer(1));
+        Assert.assertTrue(queue.offer(2));
+        Assert.assertTrue(queue.offer(3));
+
+        Assert.assertEquals(3, queue.size());
+
+        Assert.assertEquals(Integer.valueOf(3), queue.peek());
+        Assert.assertEquals(Integer.valueOf(3), queue.poll());
+        Assert.assertEquals(Integer.valueOf(3), queue.poll());
+        Assert.assertEquals(Integer.valueOf(4), queue.poll());
+        Assert.assertNull(queue.poll());
+        Assert.assertEquals(0, queue.size());
+    }
+
+    @Test
+    public void testTail3() {
+        BoundedPriorityQueue<Integer> queue = new BoundedPriorityQueue<Integer>(3,
+            Collections.<Integer>reverseOrder());
+        Assert.assertTrue(queue.offer(1));
+        Assert.assertTrue(queue.offer(4));
+        Assert.assertTrue(queue.offer(3));
+        Assert.assertTrue(queue.offer(2));
+        Assert.assertTrue(queue.offer(1));
+        Assert.assertTrue(queue.offer(2));
+        Assert.assertFalse(queue.offer(3));
+
+        Assert.assertEquals(3, queue.size());
+
+        Assert.assertEquals(Integer.valueOf(2), queue.peek());
+        Assert.assertEquals(Integer.valueOf(2), queue.poll());
+        Assert.assertEquals(Integer.valueOf(1), queue.poll());
+        Assert.assertEquals(Integer.valueOf(1), queue.poll());
+        Assert.assertNull(queue.poll());
+        Assert.assertEquals(0, queue.size());
+    }
+
+    @Test
+    public void testString1() {
+        BoundedPriorityQueue<String> queue = new BoundedPriorityQueue<>(3,
+            new Comparator<String>() {
+                @Override
+                public int compare(String o1, String o2) {
+                    return StringUtils.compare(o1, o2);
+                }
+            });
+        queue.offer("B");
+        queue.offer("A");
+        queue.offer("C");
+        queue.offer("D");
+        Assert.assertEquals("B", queue.poll());
+        Assert.assertEquals("C", queue.poll());
+        Assert.assertEquals("D", queue.poll());
+        Assert.assertNull(queue.poll());
+    }
+
+    @Test
+    public void testString2() {
+        BoundedPriorityQueue<String> queue = new BoundedPriorityQueue<>(3,
+            NaturalComparator.<String>getInstance());
+        queue.offer("B");
+        queue.offer("A");
+        queue.offer("C");
+        queue.offer("D");
+        Assert.assertEquals("B", queue.poll());
+        Assert.assertEquals("C", queue.poll());
+        Assert.assertEquals("D", queue.poll());
+        Assert.assertNull(queue.poll());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java b/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java
new file mode 100644
index 0000000..ce376cf
--- /dev/null
+++ b/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.collections.maps;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BoundedSortedMapTest {
+
+    @Test
+    public void testNaturalOrderTop3() {
+        // natural order = ascending
+        SortedMap<Integer, Double> map = new BoundedSortedMap<Integer, Double>(3);
+        Assert.assertNull(map.put(1, 1.d));
+        Assert.assertEquals(Double.valueOf(1.d), map.put(1, 1.1d));
+        Assert.assertNull(map.put(4, 4.d));
+        Assert.assertNull(map.put(2, 2.d));
+        Assert.assertEquals(Double.valueOf(2.d), map.put(2, 2.2d));
+        Assert.assertEquals(Double.valueOf(4.d), map.put(3, 3.d));
+        Assert.assertEquals(Double.valueOf(3.d), map.put(3, 3.3d));
+
+        Assert.assertEquals(3, map.size());
+
+        Iterator<Entry<Integer, Double>> itor = map.entrySet().iterator();
+        Entry<Integer, Double> e = itor.next();
+        Assert.assertEquals(Integer.valueOf(1), e.getKey());
+        Assert.assertEquals(Double.valueOf(1.1d), e.getValue());
+        e = itor.next();
+        Assert.assertEquals(Integer.valueOf(2), e.getKey());
+        Assert.assertEquals(Double.valueOf(2.2d), e.getValue());
+        e = itor.next();
+        Assert.assertEquals(Integer.valueOf(3), e.getKey());
+        Assert.assertEquals(Double.valueOf(3.3d), e.getValue());
+        Assert.assertFalse(itor.hasNext());
+    }
+
+    @Test
+    public void testReverseOrderTop3() {
+        // reverse order = descending
+        SortedMap<Integer, Double> map = new BoundedSortedMap<Integer, Double>(3, true);
+        Assert.assertNull(map.put(1, 1.d));
+        Assert.assertEquals(Double.valueOf(1.d), map.put(1, 1.1d));
+        Assert.assertNull(map.put(4, 4.d));
+        Assert.assertNull(map.put(2, 2.d));
+        Assert.assertEquals(Double.valueOf(2.d), map.put(2, 2.2d));
+        Assert.assertEquals(Double.valueOf(1.1d), map.put(3, 3.d));
+        Assert.assertEquals(Double.valueOf(3.d), map.put(3, 3.3d));
+
+        Assert.assertEquals(3, map.size());
+
+        Iterator<Entry<Integer, Double>> itor = map.entrySet().iterator();
+        Entry<Integer, Double> e = itor.next();
+        Assert.assertEquals(Integer.valueOf(4), e.getKey());
+        Assert.assertEquals(Double.valueOf(4.d), e.getValue());
+        e = itor.next();
+        Assert.assertEquals(Integer.valueOf(3), e.getKey());
+        Assert.assertEquals(Double.valueOf(3.3d), e.getValue());
+        e = itor.next();
+        Assert.assertEquals(Integer.valueOf(2), e.getKey());
+        Assert.assertEquals(Double.valueOf(2.2d), e.getValue());
+        Assert.assertFalse(itor.hasNext());
+    }
+
+}


[5/5] incubator-hivemall git commit: Close #114: refactored to_ordered_(map|list) of PR #108

Posted by my...@apache.org.
Close #114: refactored to_ordered_(map|list) of PR #108


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/688daa5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/688daa5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/688daa5f

Branch: refs/heads/master
Commit: 688daa5f8e6a87fad2abf3b47a8a8353bc1c792a
Parents: 69730f6
Author: Makoto Yui <my...@apache.org>
Authored: Wed Sep 13 21:12:41 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Wed Sep 13 21:12:41 2017 +0900

----------------------------------------------------------------------
 .../hivemall/tools/map/UDAFToOrderedMap.java    | 240 ++++++++++++++++---
 .../java/hivemall/utils/hadoop/HiveUtils.java   |  12 +
 .../tools/map/UDAFToOrderedMapTest.java         |  36 +--
 docs/gitbook/misc/generic_funcs.md              |  54 ++++-
 4 files changed, 292 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index a6b547f..5cdac4d 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -20,11 +20,17 @@ package hivemall.tools.map;
 
 import hivemall.utils.collections.maps.BoundedSortedMap;
 import hivemall.utils.hadoop.HiveUtils;
+import hivemall.utils.lang.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.TreeMap;
 
 import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
@@ -33,8 +39,16 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.IntWritable;
 
 /**
  * Convert two aggregated columns into a sorted key-value map.
@@ -63,70 +77,232 @@ public final class UDAFToOrderedMap extends UDAFToMap {
         int size = 0;
         if (typeInfo.length == 3) {
             ObjectInspector[] argOIs = info.getParameterObjectInspectors();
-            if (HiveUtils.isBooleanTypeInfo(typeInfo[2])) {
-                reverseOrder = HiveUtils.getConstBoolean(argOIs[2]);
-            } else if (HiveUtils.isIntegerTypeInfo(typeInfo[2])) {
-                size = HiveUtils.getConstInt(argOIs[2]);
+            ObjectInspector argOI2 = argOIs[2];
+            if (HiveUtils.isConstBoolean(argOI2)) {
+                reverseOrder = HiveUtils.getConstBoolean(argOI2);
+            } else if (HiveUtils.isConstInteger(argOI2)) {
+                size = HiveUtils.getConstInt(argOI2);
                 if (size == 0) {
-                    throw new UDFArgumentException("Map size must be nonzero: " + size);
+                    throw new UDFArgumentException("Map size must be non-zero value: " + size);
                 }
                 reverseOrder = (size > 0); // positive size => top-k
-                size = Math.abs(size);
             } else {
                 throw new UDFArgumentTypeException(2,
-                    "The third argument must be boolean or integer type: "
-                            + typeInfo[2].getTypeName());
+                    "The third argument must be boolean or int type: " + typeInfo[2].getTypeName());
             }
         }
 
         if (reverseOrder) { // descending
-            return new DescendingMapEvaluator(size);
+            if (size == 0) {
+                return new ReverseOrderedMapEvaluator();
+            } else {
+                return new TopKOrderedMapEvaluator();
+            }
         } else { // ascending
-            return new AscendingMapEvaluator(size);
+            if (size == 0) {
+                return new NaturalOrderedMapEvaluator();
+            } else {
+                return new TailKOrderedMapEvaluator();
+            }
         }
     }
 
-    public static final class AscendingMapEvaluator extends UDAFToMapEvaluator {
-
-        private final int size;
+    public static class NaturalOrderedMapEvaluator extends UDAFToMapEvaluator {
 
-        AscendingMapEvaluator(@Nonnegative int size) {
-            super();
-            this.size = size;
+        @Override
+        public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
         }
 
+    }
+
+    public static class ReverseOrderedMapEvaluator extends UDAFToMapEvaluator {
+
         @Override
         public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
-            if (size == 0) {
-                ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
-            } else {
-                ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size);
-            }
+            ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
+                Collections.reverseOrder());
         }
 
     }
 
-    public static final class DescendingMapEvaluator extends UDAFToMapEvaluator {
+    public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
+
+        protected PrimitiveObjectInspector inputKeyOI;
+        protected ObjectInspector inputValueOI;
+        protected MapObjectInspector partialMapOI;
+        protected PrimitiveObjectInspector sizeOI;
+
+        protected StructObjectInspector internalMergeOI;
+
+        protected StructField partialMapField;
+        protected StructField sizeField;
 
-        private final int size;
+        @Override
+        public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
+            super.init(mode, argOIs);
+
+            // initialize input
+            if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
+                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+                this.inputValueOI = argOIs[1];
+                this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
+            } else {// from partial aggregation
+                StructObjectInspector soi = (StructObjectInspector) argOIs[0];
+                this.internalMergeOI = soi;
+
+                this.partialMapField = soi.getStructFieldRef("partialMap");
+                // re-extract input key/value OIs
+                MapObjectInspector partialMapOI = (MapObjectInspector) partialMapField.getFieldObjectInspector();
+                this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
+                this.inputValueOI = partialMapOI.getMapValueObjectInspector();
+
+                this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+
+                this.sizeField = soi.getStructFieldRef("size");
+                this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
+            }
 
-        DescendingMapEvaluator(int size) {
-            super();
-            this.size = size;
+            // initialize output
+            final ObjectInspector outputOI;
+            if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
+                outputOI = internalMergeOI(inputKeyOI, inputValueOI);
+            } else {// terminate
+                outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+                    ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+                    ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+            }
+            return outputOI;
+        }
+
+        @Nonnull
+        private static StructObjectInspector internalMergeOI(
+                @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
+            List<String> fieldNames = new ArrayList<String>();
+            List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+
+            fieldNames.add("partialMap");
+            fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
+                ObjectInspectorUtils.getStandardObjectInspector(keyOI),
+                ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
+
+            fieldNames.add("size");
+            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+
+            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+        }
+
+        static class MapAggregationBuffer extends AbstractAggregationBuffer {
+            @Nullable
+            Map<Object, Object> container;
+            int size;
+
+            MapAggregationBuffer() {
+                super();
+            }
         }
 
         @Override
         public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
                 throws HiveException {
-            if (size == 0) {
-                ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
-                    Collections.reverseOrder());
-            } else {
-                ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size,
-                    true);
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+            myagg.container = null;
+            myagg.size = 0;
+        }
+
+        @Override
+        public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
+            MapAggregationBuffer myagg = new MapAggregationBuffer();
+            reset(myagg);
+            return myagg;
+        }
+
+        @Override
+        public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
+                Object[] parameters) throws HiveException {
+            assert (parameters.length == 3);
+            if (parameters[0] == null) {
+                return;
+            }
+
+            Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
+            Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
+            int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
+
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+            if (myagg.container == null) {
+                initBuffer(myagg, size);
+            }
+            myagg.container.put(key, value);
+        }
+
+        void initBuffer(@Nonnull MapAggregationBuffer agg, @Nonnegative int size) {
+            Preconditions.checkArgument(size > 0, "size MUST be greather than zero: " + size);
+
+            agg.container = new BoundedSortedMap<Object, Object>(size, true);
+            agg.size = size;
+        }
+
+        @Override
+        public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+            Object[] partialResult = new Object[2];
+            partialResult[0] = myagg.container;
+            partialResult[1] = new IntWritable(myagg.size);
+
+            return partialResult;
+        }
+
+        @Override
+        public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
+                throws HiveException {
+            if (partial == null) {
+                return;
+            }
+
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+            Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
+            Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
+            if (partialMap == null) {
+                return;
+            }
+
+            if (myagg.container == null) {
+                Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
+                int size = HiveUtils.getInt(sizeObj, sizeOI);
+                initBuffer(myagg, size);
+            }
+            for (Map.Entry<?, ?> e : partialMap.entrySet()) {
+                Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
+                Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
+                myagg.container.put(key, value);
             }
         }
 
+        @Override
+        @Nullable
+        public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+                throws HiveException {
+            MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+            return myagg.container;
+        }
+
     }
+
+    public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
+
+        @Override
+        void initBuffer(MapAggregationBuffer agg, int size) {
+            agg.container = new BoundedSortedMap<Object, Object>(size);
+            agg.size = size;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
index afa8a58..8fba349 100644
--- a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
+++ b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
@@ -326,6 +326,18 @@ public final class HiveUtils {
         return ObjectInspectorUtils.isConstantObjectInspector(oi) && isStringOI(oi);
     }
 
+    public static boolean isConstInt(@Nonnull final ObjectInspector oi) {
+        return ObjectInspectorUtils.isConstantObjectInspector(oi) && isIntOI(oi);
+    }
+
+    public static boolean isConstInteger(@Nonnull final ObjectInspector oi) {
+        return ObjectInspectorUtils.isConstantObjectInspector(oi) && isIntegerOI(oi);
+    }
+
+    public static boolean isConstBoolean(@Nonnull final ObjectInspector oi) {
+        return ObjectInspectorUtils.isConstantObjectInspector(oi) && isBooleanOI(oi);
+    }
+
     public static boolean isPrimitiveTypeInfo(@Nonnull TypeInfo typeInfo) {
         return typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
index 61642f1..38bc5ae 100644
--- a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -18,10 +18,12 @@
  */
 package hivemall.tools.map;
 
-import hivemall.tools.map.UDAFToOrderedMap.AscendingMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.DescendingMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
 
-import java.util.SortedMap;
+import java.util.Map;
 
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -33,8 +35,8 @@ public class UDAFToOrderedMapTest {
 
     @Test
     public void testNaturalOrder() throws Exception {
-        AscendingMapEvaluator evaluator = new AscendingMapEvaluator(0);
-        AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        NaturalOrderedMapEvaluator evaluator = new NaturalOrderedMapEvaluator();
+        NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -50,7 +52,7 @@ public class UDAFToOrderedMapTest {
             evaluator.iterate(agg, new Object[] {keys[i], values[i]});
         }
 
-        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Map<Object, Object> res = evaluator.terminate(agg);
         Object[] sortedValues = res.values().toArray();
 
         Assert.assertEquals(3, sortedValues.length);
@@ -63,8 +65,8 @@ public class UDAFToOrderedMapTest {
 
     @Test
     public void testReverseOrder() throws Exception {
-        DescendingMapEvaluator evaluator = new DescendingMapEvaluator(0);
-        DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        ReverseOrderedMapEvaluator evaluator = new ReverseOrderedMapEvaluator();
+        ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -81,7 +83,7 @@ public class UDAFToOrderedMapTest {
             evaluator.iterate(agg, new Object[] {keys[i], values[i]});
         }
 
-        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Map<Object, Object> res = evaluator.terminate(agg);
         Object[] sortedValues = res.values().toArray();
 
         Assert.assertEquals(3, sortedValues.length);
@@ -94,9 +96,8 @@ public class UDAFToOrderedMapTest {
 
     @Test
     public void testTopK() throws Exception {
-        int size = 2;
-        DescendingMapEvaluator evaluator = new DescendingMapEvaluator(size);
-        DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        TopKOrderedMapEvaluator evaluator = new TopKOrderedMapEvaluator();
+        TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -105,6 +106,7 @@ public class UDAFToOrderedMapTest {
 
         final double[] keys = new double[] {0.7, 0.5, 0.8};
         final String[] values = new String[] {"banana", "apple", "candy"};
+        int size = 2;
 
         evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
         evaluator.reset(agg);
@@ -113,7 +115,7 @@ public class UDAFToOrderedMapTest {
             evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
         }
 
-        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Map<Object, Object> res = evaluator.terminate(agg);
         Object[] sortedValues = res.values().toArray();
 
         Assert.assertEquals(size, sortedValues.length);
@@ -125,9 +127,8 @@ public class UDAFToOrderedMapTest {
 
     @Test
     public void testTailK() throws Exception {
-        int size = -2;
-        AscendingMapEvaluator evaluator = new AscendingMapEvaluator(Math.abs(size));
-        AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+        TailKOrderedMapEvaluator evaluator = new TailKOrderedMapEvaluator();
+        TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
 
         ObjectInspector[] inputOIs = new ObjectInspector[] {
                 PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -136,6 +137,7 @@ public class UDAFToOrderedMapTest {
 
         final double[] keys = new double[] {0.7, 0.5, 0.8};
         final String[] values = new String[] {"banana", "apple", "candy"};
+        int size = -2;
 
         evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
         evaluator.reset(agg);
@@ -144,7 +146,7 @@ public class UDAFToOrderedMapTest {
             evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
         }
 
-        SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+        Map<Object, Object> res = evaluator.terminate(agg);
         Object[] sortedValues = res.values().toArray();
 
         Assert.assertEquals(Math.abs(size), sortedValues.length);

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/docs/gitbook/misc/generic_funcs.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/generic_funcs.md b/docs/gitbook/misc/generic_funcs.md
index 66b30e2..03e1ef3 100644
--- a/docs/gitbook/misc/generic_funcs.md
+++ b/docs/gitbook/misc/generic_funcs.md
@@ -85,7 +85,36 @@ This page describes a list of useful Hivemall generic functions.
 
 ## List UDAF
 
-- `to_ordered_list(value [, const string options])` or `to_ordered_list(value, key [, const string options])` - Return list of values sorted by value itself or specific key
+- `to_ordered_list(PRIMITIVE value [, PRIMITIVE key, const string options])` or `to_ordered_list(value, key [, const string options])` - Return list of values sorted by value itself or specific key
+
+    ```sql
+    with t as (
+        select 5 as key, 'apple' as value
+        union all
+        select 3 as key, 'banana' as value
+        union all
+        select 4 as key, 'candy' as value
+        union all
+        select 2 as key, 'donut' as value
+        union all
+        select 3 as key, 'egg' as value
+    )
+    select                                             -- expected output
+        to_ordered_list(value, key, '-reverse'),       -- [apple, candy, (banana, egg | egg, banana), donut] (reverse order)
+        to_ordered_list(value, key, '-k 2'),           -- [apple, candy] (top-k)
+        to_ordered_list(value, key, '-k 100'),         -- [apple, candy, (banana, egg | egg, banana), dunut]
+        to_ordered_list(value, key, '-k 2 -reverse'),  -- [donut, (banana | egg)] (reverse top-k = tail-k)
+        to_ordered_list(value, key),                   -- [donut, (banana, egg | egg, banana), candy, apple] (natural order)
+        to_ordered_list(value, key, '-k -2'),          -- [donut, (banana | egg)] (tail-k)
+        to_ordered_list(value, key, '-k -100'),        -- [donut, (banana, egg | egg, banana), candy, apple]
+        to_ordered_list(value, key, '-k -2 -reverse'), -- [apple, candy] (reverse tail-k = top-k)
+        to_ordered_list(value, '-k 2'),                -- [egg, donut] (alphabetically)    
+        to_ordered_list(key, '-k -2 -reverse'),        -- [5, 4] (top-2 keys)
+        to_ordered_list(key)                           -- [2, 3, 3, 4, 5] (natural ordered keys)
+    from 
+        t
+    ;
+    ```
 
 # Bitset functions
 
@@ -147,6 +176,29 @@ The compression level must be in range [-1,9]
 
 - `to_ordered_map(key, value [, const int k|const boolean reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
 
+    ```sql
+    with t as (
+        select 10 as key, 'apple' as value
+        union all
+        select 3 as key, 'banana' as value
+        union all
+        select 4 as key, 'candy' as value
+    )
+    select
+        to_ordered_map(key, value, true),   -- {10:"apple",4:"candy",3:"banana"} (reverse)
+        to_ordered_map(key, value, 1),      -- {10:"apple"} (top-1)
+        to_ordered_map(key, value, 2),      -- {10:"apple",4:"candy"} (top-2)
+        to_ordered_map(key, value, 3),      -- {10:"apple",4:"candy",3:"banana"} (top-3)
+        to_ordered_map(key, value, 100),    -- {10:"apple",4:"candy",3:"banana"} (top-100)
+        to_ordered_map(key, value),         -- {3:"banana",4:"candy",10:"apple"} (natural)
+        to_ordered_map(key, value, -1),     -- {3:"banana"} (tail-1)
+        to_ordered_map(key, value, -2),     -- {3:"banana",4:"candy"} (tail-2)
+        to_ordered_map(key, value, -3),     -- {3:"banana",4:"candy",10:"apple"} (tail-3)
+        to_ordered_map(key, value, -100)    -- {3:"banana",4:"candy",10:"apple"} (tail-100)
+    from t
+    ;
+    ```
+
 # MapReduce functions
 
 - `rowid()` - Returns a generated row id of a form {TASK_ID}-{SEQUENCE_NUMBER}