You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hivemall.apache.org by my...@apache.org on 2017/09/13 12:13:11 UTC
[1/5] incubator-hivemall git commit: Close #108: [HIVEMALL-138]
to_ordered_map & to_ordered_list as a UDAF variant of each_top_k
Repository: incubator-hivemall
Updated Branches:
refs/heads/master 380478916 -> 688daa5f8
Close #108: [HIVEMALL-138] to_ordered_map & to_ordered_list as a UDAF variant of each_top_k
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/9cd3c59a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/9cd3c59a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/9cd3c59a
Branch: refs/heads/master
Commit: 9cd3c59aebb67cc6b58cdd611b96fcf42f297cde
Parents: 3804789
Author: Takuya Kitazawa <k....@gmail.com>
Authored: Mon Sep 11 15:38:05 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Mon Sep 11 15:38:05 2017 +0900
----------------------------------------------------------------------
.../hivemall/tools/list/UDAFToOrderedList.java | 535 +++++++++++++++++++
.../hivemall/tools/map/UDAFToOrderedMap.java | 214 +++++++-
.../java/hivemall/utils/hadoop/HiveUtils.java | 7 +
.../tools/array/SelectKBeatUDFTest.java | 69 ---
.../tools/array/SelectKBestUDFTest.java | 69 +++
.../tools/list/UDAFToOrderedListTest.java | 344 ++++++++++++
.../tools/map/UDAFToOrderedMapTest.java | 153 ++++++
docs/gitbook/eval/rank.md | 5 +
docs/gitbook/misc/generic_funcs.md | 7 +-
docs/gitbook/misc/topk.md | 63 +++
docs/gitbook/recommend/item_based_cf.md | 5 +
docs/gitbook/recommend/movielens_cf.md | 5 +
resources/ddl/define-all-as-permanent.hive | 7 +
resources/ddl/define-all.hive | 7 +
resources/ddl/define-all.spark | 7 +
resources/ddl/define-udfs.td.hql | 1 +
16 files changed, 1419 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
new file mode 100644
index 0000000..16c966a
--- /dev/null
+++ b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.list;
+
+import hivemall.utils.collections.BoundedPriorityQueue;
+import hivemall.utils.hadoop.HiveUtils;
+import hivemall.utils.lang.CommandLineUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.*;
+
+/**
+ * Return list of values sorted by value itself or specific key.
+ */
+@Description(
+ name = "to_ordered_list",
+ value = "_FUNC_(value [, key, const string options]) - Return list of values sorted by value itself or specific key")
+public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
+ throws SemanticException {
+ @SuppressWarnings("deprecation")
+ TypeInfo[] typeInfo = info.getParameters();
+ ObjectInspector[] argOIs = info.getParameterObjectInspectors();
+ if ((typeInfo.length == 1) || (typeInfo.length == 2 && HiveUtils.isConstString(argOIs[1]))) {
+ // sort values by value itself w/o key
+ if (typeInfo[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(0,
+ "Only primitive type arguments are accepted for value but "
+ + typeInfo[0].getTypeName() + " was passed as the first parameter.");
+ }
+ } else if ((typeInfo.length == 2)
+ || (typeInfo.length == 3 && HiveUtils.isConstString(argOIs[2]))) {
+ // sort values by key
+ if (typeInfo[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(1,
+ "Only primitive type arguments are accepted for key but "
+ + typeInfo[1].getTypeName() + " was passed as the second parameter.");
+ }
+ } else {
+ throw new UDFArgumentTypeException(typeInfo.length - 1,
+ "Number of arguments must be in [1, 3] including constant string for options: "
+ + typeInfo.length);
+ }
+ return new UDAFToOrderedListEvaluator();
+ }
+
+ public static class UDAFToOrderedListEvaluator extends GenericUDAFEvaluator {
+
+ private ObjectInspector valueOI;
+ private PrimitiveObjectInspector keyOI;
+
+ private ListObjectInspector valueListOI;
+ private ListObjectInspector keyListOI;
+
+ private StructObjectInspector internalMergeOI;
+
+ private StructField valueListField;
+ private StructField keyListField;
+ private StructField sizeField;
+ private StructField reverseOrderField;
+
+ @Nonnegative
+ private int size;
+ private boolean reverseOrder;
+ private boolean sortByKey;
+
+ protected Options getOptions() {
+ Options opts = new Options();
+ opts.addOption("k", true, "To top-k (positive) or tail-k (negative) ordered queue");
+ opts.addOption("reverse", "reverse_order", false,
+ "Sort values by key in a reverse (e.g., descending) order [default: false]");
+ return opts;
+ }
+
+ @Nonnull
+ protected final CommandLine parseOptions(String optionValue) throws UDFArgumentException {
+ String[] args = optionValue.split("\\s+");
+ Options opts = getOptions();
+ opts.addOption("help", false, "Show function help");
+ CommandLine cl = CommandLineUtils.parseOptions(args, opts);
+
+ if (cl.hasOption("help")) {
+ Description funcDesc = getClass().getAnnotation(Description.class);
+ final String cmdLineSyntax;
+ if (funcDesc == null) {
+ cmdLineSyntax = getClass().getSimpleName();
+ } else {
+ String funcName = funcDesc.name();
+ cmdLineSyntax = funcName == null ? getClass().getSimpleName()
+ : funcDesc.value().replace("_FUNC_", funcDesc.name());
+ }
+ StringWriter sw = new StringWriter();
+ sw.write('\n');
+ PrintWriter pw = new PrintWriter(sw);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(pw, HelpFormatter.DEFAULT_WIDTH, cmdLineSyntax, null, opts,
+ HelpFormatter.DEFAULT_LEFT_PAD, HelpFormatter.DEFAULT_DESC_PAD, null, true);
+ pw.flush();
+ String helpMsg = sw.toString();
+ throw new UDFArgumentException(helpMsg);
+ }
+
+ return cl;
+ }
+
+ protected CommandLine processOptions(ObjectInspector[] argOIs) throws UDFArgumentException {
+ CommandLine cl = null;
+
+ int optionIndex = 1;
+ if (sortByKey) {
+ optionIndex = 2;
+ }
+
+ int k = 0;
+ boolean reverseOrder = false;
+
+ if (argOIs.length >= optionIndex + 1) {
+ String rawArgs = HiveUtils.getConstString(argOIs[optionIndex]);
+ cl = parseOptions(rawArgs);
+
+ reverseOrder = cl.hasOption("reverse_order");
+
+ if (cl.hasOption("k")) {
+ k = Integer.parseInt(cl.getOptionValue("k"));
+ if (k == 0) {
+ throw new UDFArgumentException("`k` must be nonzero: " + k);
+ }
+ }
+ }
+
+ this.size = Math.abs(k);
+
+ if ((k > 0 && reverseOrder) || (k < 0 && !reverseOrder) || (k == 0 && !reverseOrder)) {
+ // reverse top-k, natural tail-k = ascending = natural order output = reverse order priority queue
+ this.reverseOrder = true;
+ } else { // (k > 0 && !reverseOrder) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
+ // natural top-k or reverse tail-k = descending = reverse order output = natural order priority queue
+ this.reverseOrder = false;
+ }
+
+ return cl;
+ }
+
+ @Override
+ public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
+ super.init(mode, argOIs);
+
+ // initialize input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
+ // this flag will be used in `processOptions` and `iterate` (= when Mode.PARTIAL1 or Mode.COMPLETE)
+ this.sortByKey = (argOIs.length == 2 && !HiveUtils.isConstString(argOIs[1]))
+ || (argOIs.length == 3 && HiveUtils.isConstString(argOIs[2]));
+
+ if (sortByKey) {
+ this.valueOI = argOIs[0];
+ this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[1]);
+ } else {
+ // sort values by value itself
+ this.valueOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+ this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+ }
+
+ processOptions(argOIs);
+ } else {// from partial aggregation
+ StructObjectInspector soi = (StructObjectInspector) argOIs[0];
+ this.internalMergeOI = soi;
+
+ // re-extract input value OI
+ this.valueListField = soi.getStructFieldRef("valueList");
+ StandardListObjectInspector valueListOI = (StandardListObjectInspector) valueListField.getFieldObjectInspector();
+ this.valueOI = valueListOI.getListElementObjectInspector();
+ this.valueListOI = ObjectInspectorFactory.getStandardListObjectInspector(valueOI);
+
+ // re-extract input key OI
+ this.keyListField = soi.getStructFieldRef("keyList");
+ StandardListObjectInspector keyListOI = (StandardListObjectInspector) keyListField.getFieldObjectInspector();
+ this.keyOI = HiveUtils.asPrimitiveObjectInspector(keyListOI.getListElementObjectInspector());
+ this.keyListOI = ObjectInspectorFactory.getStandardListObjectInspector(keyOI);
+
+ this.sizeField = soi.getStructFieldRef("size");
+ this.reverseOrderField = soi.getStructFieldRef("reverseOrder");
+ }
+
+ // initialize output
+ final ObjectInspector outputOI;
+ if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
+ outputOI = internalMergeOI(valueOI, keyOI);
+ } else {// terminate
+ outputOI = ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI));
+ }
+
+ return outputOI;
+ }
+
+ private static StructObjectInspector internalMergeOI(@Nonnull ObjectInspector valueOI,
+ @Nonnull PrimitiveObjectInspector keyOI) {
+ ArrayList<String> fieldNames = new ArrayList<String>();
+ ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+
+ fieldNames.add("valueList");
+ fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
+
+ fieldNames.add("keyList");
+ fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(keyOI)));
+
+ fieldNames.add("size");
+ fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+
+ fieldNames.add("reverseOrder");
+ fieldOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ QueueAggregationBuffer myagg = new QueueAggregationBuffer();
+ reset(myagg);
+ return myagg;
+ }
+
+ @Override
+ public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+ myagg.reset(size, reverseOrder);
+ }
+
+ @Override
+ public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
+ Object[] parameters) throws HiveException {
+ if (parameters[0] == null) {
+ return;
+ }
+ Object value = ObjectInspectorUtils.copyToStandardObject(parameters[0], valueOI);
+
+ final Object key;
+ if (sortByKey) {
+ if (parameters[1] == null) {
+ return;
+ }
+ key = ObjectInspectorUtils.copyToStandardObject(parameters[1], keyOI);
+ } else {
+ // set value to key
+ key = ObjectInspectorUtils.copyToStandardObject(parameters[0], valueOI);
+ }
+
+ TupleWithKey tuple = new TupleWithKey(key, value);
+ QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+
+ myagg.iterate(tuple);
+ }
+
+ @Override
+ public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+
+ Map<String, List<Object>> tuples = myagg.drainQueue();
+ List<Object> valueList = tuples.get("value");
+ List<Object> keyList = tuples.get("key");
+ if (valueList.size() == 0) {
+ return null;
+ }
+
+ Object[] partialResult = new Object[4];
+ partialResult[0] = valueList;
+ partialResult[1] = keyList;
+ partialResult[2] = new IntWritable(myagg.size);
+ partialResult[3] = new BooleanWritable(myagg.reverseOrder);
+
+ return partialResult;
+ }
+
+ @Override
+ public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
+ throws HiveException {
+ if (partial == null) {
+ return;
+ }
+
+ Object valueListObj = internalMergeOI.getStructFieldData(partial, valueListField);
+ final List<?> valueListRaw = valueListOI.getList(HiveUtils.castLazyBinaryObject(valueListObj));
+ final List<Object> valueList = new ArrayList<Object>();
+ for (int i = 0, n = valueListRaw.size(); i < n; i++) {
+ valueList.add(ObjectInspectorUtils.copyToStandardObject(valueListRaw.get(i),
+ valueOI));
+ }
+
+ Object keyListObj = internalMergeOI.getStructFieldData(partial, keyListField);
+ final List<?> keyListRaw = keyListOI.getList(HiveUtils.castLazyBinaryObject(keyListObj));
+ final List<Object> keyList = new ArrayList<Object>();
+ for (int i = 0, n = keyListRaw.size(); i < n; i++) {
+ keyList.add(ObjectInspectorUtils.copyToStandardObject(keyListRaw.get(i), keyOI));
+ }
+
+ Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
+ int size = PrimitiveObjectInspectorFactory.writableIntObjectInspector.get(sizeObj);
+
+ Object reverseOrderObj = internalMergeOI.getStructFieldData(partial, reverseOrderField);
+ boolean reverseOrder = PrimitiveObjectInspectorFactory.writableBooleanObjectInspector.get(reverseOrderObj);
+
+ QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+ myagg.setOptions(size, reverseOrder);
+ myagg.merge(keyList, valueList);
+ }
+
+ @Override
+ public Object terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
+ Map<String, List<Object>> tuples = myagg.drainQueue();
+ return tuples.get("value");
+ }
+
+ static class QueueAggregationBuffer extends AbstractAggregationBuffer {
+
+ private AbstractQueueHandler queueHandler;
+
+ @Nonnegative
+ private int size;
+ private boolean reverseOrder;
+
+ QueueAggregationBuffer() {
+ super();
+ }
+
+ void reset(@Nonnegative int size, boolean reverseOrder) {
+ setOptions(size, reverseOrder);
+ this.queueHandler = null;
+ }
+
+ void setOptions(@Nonnegative int size, boolean reverseOrder) {
+ this.size = size;
+ this.reverseOrder = reverseOrder;
+ }
+
+ void iterate(TupleWithKey tuple) {
+ if (queueHandler == null) {
+ initQueueHandler();
+ }
+ queueHandler.offer(tuple);
+ }
+
+ void merge(List<Object> o_keyList, List<Object> o_valueList) {
+ if (queueHandler == null) {
+ initQueueHandler();
+ }
+ for (int i = 0, n = o_keyList.size(); i < n; i++) {
+ queueHandler.offer(new TupleWithKey(o_keyList.get(i), o_valueList.get(i)));
+ }
+ }
+
+ @Nonnull
+ Map<String, List<Object>> drainQueue() {
+ int n = queueHandler.size();
+ final Object[] keys = new Object[n];
+ final Object[] values = new Object[n];
+ for (int i = n - 1; i >= 0; i--) { // head element in queue should be stored to tail of array
+ TupleWithKey tuple = queueHandler.poll();
+ keys[i] = tuple.getKey();
+ values[i] = tuple.getValue();
+ }
+ queueHandler.clear();
+
+ Map<String, List<Object>> res = new HashMap<String, List<Object>>();
+ res.put("key", Arrays.asList(keys));
+ res.put("value", Arrays.asList(values));
+ return res;
+ }
+
+ private void initQueueHandler() {
+ final Comparator<TupleWithKey> comparator;
+ if (reverseOrder) {
+ comparator = Collections.reverseOrder();
+ } else {
+ comparator = new Comparator<TupleWithKey>() {
+ @Override
+ public int compare(TupleWithKey o1, TupleWithKey o2) {
+ return o1.compareTo(o2);
+ }
+ };
+ }
+
+ if (size > 0) {
+ this.queueHandler = new BoundedQueueHandler(size, comparator);
+ } else {
+ this.queueHandler = new QueueHandler(comparator);
+ }
+ }
+
+ }
+
+ /**
+ * Since BoundedPriorityQueue does not directly inherit PriorityQueue, we provide handler
+ * class which wraps each of PriorityQueue and BoundedPriorityQueue.
+ */
+ private static abstract class AbstractQueueHandler {
+
+ abstract void offer(TupleWithKey tuple);
+
+ abstract int size();
+
+ abstract TupleWithKey poll();
+
+ abstract void clear();
+
+ }
+
+ private static final class QueueHandler extends AbstractQueueHandler {
+
+ private static final int DEFAULT_INITIAL_CAPACITY = 11; // same as PriorityQueue
+
+ private final PriorityQueue<TupleWithKey> queue;
+
+ QueueHandler(@Nonnull Comparator<TupleWithKey> comparator) {
+ this.queue = new PriorityQueue<TupleWithKey>(DEFAULT_INITIAL_CAPACITY, comparator);
+ }
+
+ @Override
+ void offer(TupleWithKey tuple) {
+ queue.offer(tuple);
+ }
+
+ @Override
+ int size() {
+ return queue.size();
+ }
+
+ @Override
+ TupleWithKey poll() {
+ return queue.poll();
+ }
+
+ @Override
+ void clear() {
+ queue.clear();
+ }
+
+ }
+
+ private static final class BoundedQueueHandler extends AbstractQueueHandler {
+
+ private final BoundedPriorityQueue<TupleWithKey> queue;
+
+ BoundedQueueHandler(int size, @Nonnull Comparator<TupleWithKey> comparator) {
+ this.queue = new BoundedPriorityQueue<TupleWithKey>(size, comparator);
+ }
+
+ @Override
+ void offer(TupleWithKey tuple) {
+ queue.offer(tuple);
+ }
+
+ @Override
+ int size() {
+ return queue.size();
+ }
+
+ @Override
+ TupleWithKey poll() {
+ return queue.poll();
+ }
+
+ @Override
+ void clear() {
+ queue.clear();
+ }
+
+ }
+
+ private static final class TupleWithKey implements Comparable<TupleWithKey> {
+ private Object key;
+ private Object value;
+
+ TupleWithKey(Object key, Object value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ Object getKey() {
+ return key;
+ }
+
+ Object getValue() {
+ return value;
+ }
+
+ @Override
+ public int compareTo(TupleWithKey o) {
+ Comparable<? super Object> k = (Comparable<? super Object>) key;
+ return k.compareTo(o.getKey());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index 4d011cd..3e6caa4 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -20,23 +20,37 @@ package hivemall.tools.map;
import hivemall.utils.hadoop.HiveUtils;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.IntWritable;
+
+import javax.annotation.Nonnull;
/**
* Convert two aggregated columns into a sorted key-value map.
*/
@Description(name = "to_ordered_map",
- value = "_FUNC_(key, value [, const boolean reverseOrder=false]) "
+ value = "_FUNC_(key, value [, const int k|const boolean reverseOrder=false]) "
+ "- Convert two aggregated columns into an ordered key-value map")
public class UDAFToOrderedMap extends UDAFToMap {
@@ -54,19 +68,35 @@ public class UDAFToOrderedMap extends UDAFToMap {
"Only primitive type arguments are accepted for the key but "
+ typeInfo[0].getTypeName() + " was passed as parameter 1.");
}
+
boolean reverseOrder = false;
+ int size = 0;
if (typeInfo.length == 3) {
- if (HiveUtils.isBooleanTypeInfo(typeInfo[2]) == false) {
- throw new UDFArgumentTypeException(2, "The three argument must be boolean type: "
- + typeInfo[2].getTypeName());
- }
ObjectInspector[] argOIs = info.getParameterObjectInspectors();
- reverseOrder = HiveUtils.getConstBoolean(argOIs[2]);
+ if (HiveUtils.isBooleanTypeInfo(typeInfo[2])) {
+ reverseOrder = HiveUtils.getConstBoolean(argOIs[2]);
+ } else if (HiveUtils.isIntegerTypeInfo(typeInfo[2])) {
+ size = HiveUtils.getConstInt(argOIs[2]);
+ if (size == 0) {
+ throw new UDFArgumentException("Map size must be nonzero: " + size);
+ }
+ reverseOrder = (size > 0); // positive size => top-k
+ } else {
+ throw new UDFArgumentTypeException(2,
+ "The third argument must be boolean or integer type: "
+ + typeInfo[2].getTypeName());
+ }
}
- if (reverseOrder) {
+ if (reverseOrder) { // descending
+ if (size != 0) {
+ return new TopKOrderedMapEvaluator();
+ }
return new ReverseOrderedMapEvaluator();
- } else {
+ } else { // ascending
+ if (size != 0) {
+ return new TailKOrderedMapEvaluator();
+ }
return new NaturalOrderedMapEvaluator();
}
}
@@ -92,4 +122,172 @@ public class UDAFToOrderedMap extends UDAFToMap {
}
+ public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
+
+ protected PrimitiveObjectInspector inputKeyOI;
+ protected ObjectInspector inputValueOI;
+ protected StandardMapObjectInspector partialMapOI;
+ protected PrimitiveObjectInspector sizeOI;
+
+ protected StructObjectInspector internalMergeOI;
+
+ protected StructField partialMapField;
+ protected StructField sizeField;
+
+ @Override
+ public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
+ super.init(mode, argOIs);
+
+ // initialize input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
+ this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+ this.inputValueOI = argOIs[1];
+ this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
+ } else {// from partial aggregation
+ StructObjectInspector soi = (StructObjectInspector) argOIs[0];
+ this.internalMergeOI = soi;
+
+ this.partialMapField = soi.getStructFieldRef("partialMap");
+ // re-extract input key/value OIs
+ StandardMapObjectInspector partialMapOI = (StandardMapObjectInspector) partialMapField.getFieldObjectInspector();
+ this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
+ this.inputValueOI = partialMapOI.getMapValueObjectInspector();
+
+ this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+ ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+ ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+
+ this.sizeField = soi.getStructFieldRef("size");
+ this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
+ }
+
+ // initialize output
+ final ObjectInspector outputOI;
+ if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
+ outputOI = internalMergeOI(inputKeyOI, inputValueOI);
+ } else {// terminate
+ outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+ ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+ ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+ }
+ return outputOI;
+ }
+
+ private static StructObjectInspector internalMergeOI(
+ @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
+ ArrayList<String> fieldNames = new ArrayList<String>();
+ ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+
+ fieldNames.add("partialMap");
+ fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
+ ObjectInspectorUtils.getStandardObjectInspector(keyOI),
+ ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
+
+ fieldNames.add("size");
+ fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+ }
+
+ static class MapAggregationBuffer extends AbstractAggregationBuffer {
+ Map<Object, Object> container;
+ int size;
+
+ MapAggregationBuffer() {
+ super();
+ }
+ }
+
+ @Override
+ public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ myagg.container = new TreeMap<Object, Object>(Collections.reverseOrder());
+ myagg.size = Integer.MAX_VALUE;
+ }
+
+ @Override
+ public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
+ MapAggregationBuffer myagg = new MapAggregationBuffer();
+ reset(myagg);
+ return myagg;
+ }
+
+ @Override
+ public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
+ Object[] parameters) throws HiveException {
+ assert (parameters.length == 3);
+
+ if (parameters[0] == null) {
+ return;
+ }
+
+ Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
+ Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
+ int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
+
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ myagg.container.put(key, value);
+ myagg.size = size;
+ }
+
+ @Override
+ public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+ Object[] partialResult = new Object[2];
+ partialResult[0] = myagg.container;
+ partialResult[1] = new IntWritable(myagg.size);
+
+ return partialResult;
+ }
+
+ @Override
+ public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
+ throws HiveException {
+ if (partial == null) {
+ return;
+ }
+
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+ Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
+ Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
+ for (Map.Entry<?, ?> e : partialMap.entrySet()) {
+ Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
+ Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
+ myagg.container.put(key, value);
+ }
+
+ Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
+ int size = HiveUtils.getInt(sizeObj, sizeOI);
+ myagg.size = size;
+ }
+
+ @Override
+ public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ if (myagg.size < myagg.container.size()) {
+ Object toKey = myagg.container.keySet().toArray()[myagg.size];
+ return ((SortedMap<Object, Object>) myagg.container).headMap(toKey);
+ }
+ return myagg.container;
+ }
+
+ }
+
+ public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
+
+ @Override
+ public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ myagg.container = new TreeMap<Object, Object>();
+ myagg.size = Integer.MAX_VALUE;
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
index db56b82..afa8a58 100644
--- a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
+++ b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
@@ -440,6 +440,13 @@ public final class HiveUtils {
return PrimitiveObjectInspectorUtils.getDouble(o, oi);
}
+ public static int getInt(@Nullable Object o, @Nonnull PrimitiveObjectInspector oi) {
+ if (o == null) {
+ return 0;
+ }
+ return PrimitiveObjectInspectorUtils.getInt(o, oi);
+ }
+
@SuppressWarnings("unchecked")
@Nullable
public static <T extends Writable> T getConstValue(@Nonnull final ObjectInspector oi)
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java b/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java
deleted file mode 100644
index 3e3fc12..0000000
--- a/core/src/test/java/hivemall/tools/array/SelectKBeatUDFTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package hivemall.tools.array;
-
-import hivemall.utils.hadoop.WritableUtils;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SelectKBeatUDFTest {
-
- @Test
- public void test() throws Exception {
- final SelectKBestUDF selectKBest = new SelectKBestUDF();
- final int k = 2;
- final double[] data = new double[] {250.29999999999998, 170.90000000000003, 73.2,
- 12.199999999999996};
- final double[] importanceList = new double[] {292.1666753739119, 152.70000455081467,
- 187.93333893418327, 59.93333511948589};
-
- final GenericUDF.DeferredObject[] dObjs = new GenericUDF.DeferredObject[] {
- new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(data)),
- new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(importanceList)),
- new GenericUDF.DeferredJavaObject(k)};
-
- selectKBest.initialize(new ObjectInspector[] {
- ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
- ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
- ObjectInspectorUtils.getConstantObjectInspector(
- PrimitiveObjectInspectorFactory.javaIntObjectInspector, k)});
- final List<DoubleWritable> resultObj = selectKBest.evaluate(dObjs);
-
- Assert.assertEquals(resultObj.size(), k);
-
- final double[] result = new double[k];
- for (int i = 0; i < k; i++) {
- result[i] = resultObj.get(i).get();
- }
-
- final double[] answer = new double[] {250.29999999999998, 73.2};
-
- Assert.assertArrayEquals(answer, result, 0.d);
- selectKBest.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java b/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java
new file mode 100644
index 0000000..15366a7
--- /dev/null
+++ b/core/src/test/java/hivemall/tools/array/SelectKBestUDFTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.array;
+
+import hivemall.utils.hadoop.WritableUtils;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SelectKBestUDFTest {
+
+ @Test
+ public void test() throws Exception {
+ final SelectKBestUDF selectKBest = new SelectKBestUDF();
+ final int k = 2;
+ final double[] data = new double[] {250.29999999999998, 170.90000000000003, 73.2,
+ 12.199999999999996};
+ final double[] importanceList = new double[] {292.1666753739119, 152.70000455081467,
+ 187.93333893418327, 59.93333511948589};
+
+ final GenericUDF.DeferredObject[] dObjs = new GenericUDF.DeferredObject[] {
+ new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(data)),
+ new GenericUDF.DeferredJavaObject(WritableUtils.toWritableList(importanceList)),
+ new GenericUDF.DeferredJavaObject(k)};
+
+ selectKBest.initialize(new ObjectInspector[] {
+ ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
+ ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector),
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaIntObjectInspector, k)});
+ final List<DoubleWritable> resultObj = selectKBest.evaluate(dObjs);
+
+ Assert.assertEquals(resultObj.size(), k);
+
+ final double[] result = new double[k];
+ for (int i = 0; i < k; i++) {
+ result[i] = resultObj.get(i).get();
+ }
+
+ final double[] answer = new double[] {250.29999999999998, 73.2};
+
+ Assert.assertArrayEquals(answer, result, 0.d);
+ selectKBest.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
new file mode 100644
index 0000000..f466dbc
--- /dev/null
+++ b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.list;
+
+import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator;
+import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator.QueueAggregationBuffer;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+@SuppressWarnings("deprecation")
+public class UDAFToOrderedListTest {
+
+ GenericUDAFEvaluator evaluator;
+ QueueAggregationBuffer agg;
+
+ @Before
+ public void setUp() throws Exception {
+ this.evaluator = new UDAFToOrderedListEvaluator();
+ this.agg = (QueueAggregationBuffer) evaluator.getNewAggregationBuffer();
+ }
+
+ @Test
+ public void testNaturalOrder() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {PrimitiveObjectInspectorFactory.javaDoubleObjectInspector};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(3, res.size());
+ Assert.assertEquals("apple", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ Assert.assertEquals("candy", res.get(2));
+ }
+
+ @Test
+ public void testReverseOrder() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-reverse_order")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(3, res.size());
+ Assert.assertEquals("candy", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ Assert.assertEquals("apple", res.get(2));
+ }
+
+ @Test
+ public void testTopK() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("candy", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+ @Test
+ public void testReverseTopK() throws Exception {
+ // = tail-k
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2 -reverse")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("apple", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+ @Test
+ public void testTailK() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("apple", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+ @Test
+ public void testReverseTailK() throws Exception {
+ // = top-k
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2 -reverse")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("candy", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+ @Test
+ public void testNaturalOrderWithKey() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ final double[] keys = new double[] {0.7, 0.5, 0.7};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(3, res.size());
+ Assert.assertEquals("apple", res.get(0));
+ if (res.get(1) == "banana") { // duplicated key (0.7)
+ Assert.assertEquals("candy", res.get(2));
+ } else {
+ Assert.assertEquals("banana", res.get(2));
+ }
+ }
+
+ @Test
+ public void testReverseOrderWithKey() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-reverse_order")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ final double[] keys = new double[] {0.7, 0.5, 0.7};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(3, res.size());
+ if (res.get(0) == "banana") { // duplicated key (0.7)
+ Assert.assertEquals("candy", res.get(1));
+ } else {
+ Assert.assertEquals("banana", res.get(1));
+ }
+ Assert.assertEquals("apple", res.get(2));
+ }
+
+ @Test
+ public void testTopKWithKey() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("candy", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+ @Test
+ public void testReverseTopKWithKey() throws Exception {
+ // = tail-k
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k 2 -reverse")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("apple", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+ @Test
+ public void testTailKWithKey() throws Exception {
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("apple", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+ @Test
+ public void testReverseTailKWithKey() throws Exception {
+ // = top-k
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ ObjectInspectorUtils.getConstantObjectInspector(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-k -2 -reverse")};
+
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < values.length; i++) {
+ evaluator.iterate(agg, new Object[] {values[i], keys[i]});
+ }
+
+ List<Object> res = (List<Object>) evaluator.terminate(agg);
+
+ Assert.assertEquals(2, res.size());
+ Assert.assertEquals("candy", res.get(0));
+ Assert.assertEquals("banana", res.get(1));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
new file mode 100644
index 0000000..9289a02
--- /dev/null
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.map;
+
+import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.SortedMap;
+
+@SuppressWarnings("deprecation")
+public class UDAFToOrderedMapTest {
+
+ @Test
+ public void testNaturalOrder() throws Exception {
+ GenericUDAFEvaluator evaluator = new NaturalOrderedMapEvaluator();
+ NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector};
+
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < keys.length; i++) {
+ evaluator.iterate(agg, new Object[] {keys[i], values[i]});
+ }
+
+ SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Object[] sortedValues = res.values().toArray();
+
+ Assert.assertEquals(3, sortedValues.length);
+ Assert.assertEquals("apple", sortedValues[0]);
+ Assert.assertEquals("banana", sortedValues[1]);
+ Assert.assertEquals("candy", sortedValues[2]);
+ }
+
+ @Test
+ public void testReverseOrder() throws Exception {
+ GenericUDAFEvaluator evaluator = new ReverseOrderedMapEvaluator();
+ ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaBooleanObjectInspector};
+
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+ final String[] values = new String[] {"banana", "apple", "candy"};
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < keys.length; i++) {
+ evaluator.iterate(agg, new Object[] {keys[i], values[i]});
+ }
+
+ SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Object[] sortedValues = res.values().toArray();
+
+ Assert.assertEquals(3, sortedValues.length);
+ Assert.assertEquals("candy", sortedValues[0]);
+ Assert.assertEquals("banana", sortedValues[1]);
+ Assert.assertEquals("apple", sortedValues[2]);
+ }
+
+ @Test
+ public void testTopK() throws Exception {
+ GenericUDAFEvaluator evaluator = new TopKOrderedMapEvaluator();
+ TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaIntObjectInspector};
+
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ int size = 2;
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < keys.length; i++) {
+ evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
+ }
+
+ SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Object[] sortedValues = res.values().toArray();
+
+ Assert.assertEquals(size, sortedValues.length);
+ Assert.assertEquals("candy", sortedValues[0]);
+ Assert.assertEquals("banana", sortedValues[1]);
+ }
+
+ @Test
+ public void testTailK() throws Exception {
+ GenericUDAFEvaluator evaluator = new TailKOrderedMapEvaluator();
+ TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+
+ ObjectInspector[] inputOIs = new ObjectInspector[] {
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaIntObjectInspector};
+
+ final double[] keys = new double[] {0.7, 0.5, 0.8};
+ final String[] values = new String[] {"banana", "apple", "candy"};
+ int size = -2;
+
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
+ evaluator.reset(agg);
+
+ for (int i = 0; i < keys.length; i++) {
+ evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
+ }
+
+ SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Object[] sortedValues = res.values().toArray();
+
+ Assert.assertEquals(Math.abs(size), sortedValues.length);
+ Assert.assertEquals("apple", sortedValues[0]);
+ Assert.assertEquals("banana", sortedValues[1]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/eval/rank.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/eval/rank.md b/docs/gitbook/eval/rank.md
index ed1a44c..db681ac 100644
--- a/docs/gitbook/eval/rank.md
+++ b/docs/gitbook/eval/rank.md
@@ -28,6 +28,11 @@ Practical machine learning applications such as information retrieval and recomm
This page focuses on evaluation of the results from such ranking problems.
+> #### Caution
+> In order to obtain ranked list of items, this page introduces queries using `to_ordered_map()` such as `map_values(to_ordered_map(score, itemid, true))`. However, this kind of usage has a potential issue that multiple `itemid`-s (i.e., values) which have the exactly same `score` (i.e., key) will be aggregated to single arbitrary `itemid`, because `to_ordered_map()` creates a key-value map which uses duplicated `score` as key.
+>
+> Hence, if map key could duplicate on more then one map values, we recommend you to use `to_ordered_list(value, key, '-reverse')` instead of `map_values(to_ordered_map(key, value, true))`. The alternative approach is available from Hivemall v0.5-rc.1 or later.
+
# Binary Response Measures
In a context of ranking problem, **binary response** means that binary labels are assigned to items, and positive items are considered as *truth* observations.
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/misc/generic_funcs.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/generic_funcs.md b/docs/gitbook/misc/generic_funcs.md
index b3a0421..b27117f 100644
--- a/docs/gitbook/misc/generic_funcs.md
+++ b/docs/gitbook/misc/generic_funcs.md
@@ -83,6 +83,10 @@ This page describes a list of useful Hivemall generic functions.
- `array_sum(array<NUMBER>)` - Returns an array<double> in which each element is summed up
+## List UDAF
+
+- `to_ordered_list(value [, const string options])` or `to_ordered_list(value, key [, const string options])` - Return list of values sorted by value itself or specific key
+
# Bitset functions
## Bitset UDF
@@ -141,8 +145,7 @@ The compression level must be in range [-1,9]
- `to_map(key, value)` - Convert two aggregated columns into a key-value map
-- `to_ordered_map(key, value [, const boolean reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
-
+- `to_ordered_map(key, value [, const int|boolean k|reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
# MapReduce functions
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/misc/topk.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/topk.md b/docs/gitbook/misc/topk.md
index 6a80514..27cf7ad 100644
--- a/docs/gitbook/misc/topk.md
+++ b/docs/gitbook/misc/topk.md
@@ -379,3 +379,66 @@ FROM
| 4 | 0.4432108402252197 | 3 | 26220 | 1 |
| 5 | 0.44323229789733887 | 3 | 18541 | 0 |
| .. | .. | .. | .. | .. |
+
+# Alternative approaches
+
+In order to utilize mapper-side aggregation and reduce computational cost of shuffling, you can use [`to_ordered_map`](./generic_funcs.md#map-udafs) or [`to_ordered_list`](./generic_funcs.md#list-udaf) to get top/tail-k elements instead of `each_top_k`.
+
+As long as `key` is unique in each `id`, the following queries return same result:
+
+```sql
+with t as (
+ select
+ each_top_k(
+ 10, id, key,
+ id, value
+ ) as (rank, key, id, value)
+ from (
+ select
+ *
+ from
+ test
+ cluster by
+ id
+ ) t
+)
+select
+ id, collect_list(value) as topk
+from
+ t
+group by
+ id
+```
+
+```sql
+with t as (
+ select
+ id, to_ordered_map(key, value, 10) as m
+ from
+ test
+ group by
+ id
+)
+select
+ id, collect_list(value) as topk
+from
+ t
+lateral view explode(m) t as key, value
+group by
+ id
+```
+
+```sql
+select
+ id, to_ordered_list(value, key, '-k 10') as topk
+from
+ test
+group by
+ id
+```
+
+> #### Caution
+>
+> In case that `key` could duplicate in `id`, `to_ordered_map` behaves differently because key `K` is always unique in `Map<K, V>`.
+
+Similarly to `each_top_k`, tail-k can also be represented as: `to_ordered_map(key, value, -10)` and `to_ordered_list(value, key, '-k -10')`.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/recommend/item_based_cf.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/recommend/item_based_cf.md b/docs/gitbook/recommend/item_based_cf.md
index 9e4f7e4..053b225 100644
--- a/docs/gitbook/recommend/item_based_cf.md
+++ b/docs/gitbook/recommend/item_based_cf.md
@@ -437,6 +437,11 @@ from (
In order to generate a list of recommended items, you can use either cooccurrence count or similarity as a relevance score.
+> #### Caution
+> In order to obtain ranked list of items, this section introduces queries using `map_values(to_ordered_map(rank, rec_item))`. However, this kind of usage has a potential issue that multiple `rec_item`-s which have the exactly same `rank` will be aggregated to single arbitrary `rec_item`, because `to_ordered_map()` creates a key-value map which uses duplicated `rank` as key.
+>
+> Since such situation is possible in case that `each_top_k()` is executed for different `userid`-s who have the same `cnt` or `similarity`, we recommend you to use `to_ordered_list(rec_item, rank, '-reverse')` instead of `map_values(to_ordered_map(rank, rec_item, true))`. The alternative approach is available from Hivemall v0.5-rc.1 or later.
+
### Cooccurrence-based
```sql
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/docs/gitbook/recommend/movielens_cf.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/recommend/movielens_cf.md b/docs/gitbook/recommend/movielens_cf.md
index faa555c..08268a8 100644
--- a/docs/gitbook/recommend/movielens_cf.md
+++ b/docs/gitbook/recommend/movielens_cf.md
@@ -21,6 +21,11 @@
<!-- toc -->
+> #### Caution
+> In order to obtain ranked list of items, this page introduces queries using `to_ordered_map()` such as `map_values(to_ordered_map(rating, movieid, true))`. However, this kind of usage has a potential issue that multiple `movieid`-s (i.e., values) which have the exactly same `rating` (i.e., key) will be aggregated to single arbitrary `movieid`, because `to_ordered_map()` creates a key-value map which uses duplicated `rating` as key.
+>
+> Hence, if map key could duplicate on more then one map values, we recommend you to use `to_ordered_list(value, key, '-reverse')` instead of `map_values(to_ordered_map(key, value, true))`. The alternative approach is available from Hivemall v0.5-rc.1 or later.
+
# Compute movie-movie similarity
[As we explained in the general introduction of item-based CF](item_based_cf.html#dimsum-approximated-all-pairs-cosine-similarity-computation.md), following query finds top-$$k$$ nearest-neighborhood movies for each movie:
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-all-as-permanent.hive
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all-as-permanent.hive b/resources/ddl/define-all-as-permanent.hive
index c2b38fb..cda33f9 100644
--- a/resources/ddl/define-all-as-permanent.hive
+++ b/resources/ddl/define-all-as-permanent.hive
@@ -467,6 +467,13 @@ DROP FUNCTION IF EXISTS to_ordered_map;
CREATE FUNCTION to_ordered_map as 'hivemall.tools.map.UDAFToOrderedMap' USING JAR '${hivemall_jar}';
---------------------
+-- list functions --
+---------------------
+
+DROP FUNCTION IF EXISTS to_ordered_list;
+CREATE FUNCTION to_ordered_list as 'hivemall.tools.list.UDAFToOrderedList' USING JAR '${hivemall_jar}';
+
+---------------------
-- Math functions --
---------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-all.hive
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all.hive b/resources/ddl/define-all.hive
index 89821f8..6e116ac 100644
--- a/resources/ddl/define-all.hive
+++ b/resources/ddl/define-all.hive
@@ -459,6 +459,13 @@ drop temporary function if exists to_ordered_map;
create temporary function to_ordered_map as 'hivemall.tools.map.UDAFToOrderedMap';
---------------------
+-- list functions --
+---------------------
+
+drop temporary function if exists to_ordered_list;
+create temporary function to_ordered_list as 'hivemall.tools.list.UDAFToOrderedList';
+
+---------------------
-- Math functions --
---------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-all.spark
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all.spark b/resources/ddl/define-all.spark
index b4926e3..d3eb3cd 100644
--- a/resources/ddl/define-all.spark
+++ b/resources/ddl/define-all.spark
@@ -458,6 +458,13 @@ sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS to_ordered_map")
sqlContext.sql("CREATE TEMPORARY FUNCTION to_ordered_map AS 'hivemall.tools.map.UDAFToOrderedMap'")
/**
+ * List functions
+ */
+
+sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS to_ordered_list")
+sqlContext.sql("CREATE TEMPORARY FUNCTION to_ordered_list AS 'hivemall.tools.list.UDAFToOrderedList'")
+
+/**
* Math functions
*/
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/9cd3c59a/resources/ddl/define-udfs.td.hql
----------------------------------------------------------------------
diff --git a/resources/ddl/define-udfs.td.hql b/resources/ddl/define-udfs.td.hql
index c7fdd49..2662260 100644
--- a/resources/ddl/define-udfs.td.hql
+++ b/resources/ddl/define-udfs.td.hql
@@ -177,6 +177,7 @@ create temporary function tree_export as 'hivemall.smile.tools.TreeExportUDF';
create temporary function train_ffm as 'hivemall.fm.FieldAwareFactorizationMachineUDTF';
create temporary function ffm_predict as 'hivemall.fm.FFMPredictGenericUDAF';
create temporary function add_field_indicies as 'hivemall.ftvec.trans.AddFieldIndicesUDF';
+create temporary function to_ordered_list as 'hivemall.tools.list.UDAFToOrderedList';
-- NLP features
create temporary function tokenize_ja as 'hivemall.nlp.tokenizer.KuromojiUDF';
[2/5] incubator-hivemall git commit: Fixed to_ordered_map description
Posted by my...@apache.org.
Fixed to_ordered_map description
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/44528a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/44528a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/44528a83
Branch: refs/heads/master
Commit: 44528a83b065aa50255c751fb37041d1adbeb558
Parents: 9cd3c59
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:22:59 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:22:59 2017 +0900
----------------------------------------------------------------------
docs/gitbook/misc/generic_funcs.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/44528a83/docs/gitbook/misc/generic_funcs.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/generic_funcs.md b/docs/gitbook/misc/generic_funcs.md
index b27117f..66b30e2 100644
--- a/docs/gitbook/misc/generic_funcs.md
+++ b/docs/gitbook/misc/generic_funcs.md
@@ -145,7 +145,7 @@ The compression level must be in range [-1,9]
- `to_map(key, value)` - Convert two aggregated columns into a key-value map
-- `to_ordered_map(key, value [, const int|boolean k|reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
+- `to_ordered_map(key, value [, const int k|const boolean reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
# MapReduce functions
[4/5] incubator-hivemall git commit: Refactored to_ordered_list and
to_ordered_map UDAF
Posted by my...@apache.org.
Refactored to_ordered_list and to_ordered_map UDAF
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/69730f65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/69730f65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/69730f65
Branch: refs/heads/master
Commit: 69730f65d76b54890b141ee13567eb681c0374ae
Parents: 21a8781
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:26:49 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:27:23 2017 +0900
----------------------------------------------------------------------
.../hivemall/tools/list/UDAFToOrderedList.java | 116 +++++-----
.../hivemall/tools/map/UDAFToOrderedMap.java | 221 +++----------------
.../tools/list/UDAFToOrderedListTest.java | 32 ++-
.../tools/map/UDAFToOrderedMapTest.java | 40 ++--
4 files changed, 132 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
index 16c966a..e88a16c 100644
--- a/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
+++ b/core/src/main/java/hivemall/tools/list/UDAFToOrderedList.java
@@ -21,6 +21,23 @@ package hivemall.tools.list;
import hivemall.utils.collections.BoundedPriorityQueue;
import hivemall.utils.hadoop.HiveUtils;
import hivemall.utils.lang.CommandLineUtils;
+import hivemall.utils.lang.NaturalComparator;
+import hivemall.utils.lang.Preconditions;
+import hivemall.utils.struct.Pair;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
@@ -33,25 +50,26 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.*;
-
/**
* Return list of values sorted by value itself or specific key.
*/
-@Description(
- name = "to_ordered_list",
- value = "_FUNC_(value [, key, const string options]) - Return list of values sorted by value itself or specific key")
-public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
+@Description(name = "to_ordered_list",
+ value = "_FUNC_(PRIMITIVE value [, PRIMITIVE key, const string options])"
+ + " - Return list of values sorted by value itself or specific key")
+public final class UDAFToOrderedList extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
@@ -151,7 +169,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
int k = 0;
boolean reverseOrder = false;
-
if (argOIs.length >= optionIndex + 1) {
String rawArgs = HiveUtils.getConstString(argOIs[optionIndex]);
cl = parseOptions(rawArgs);
@@ -161,18 +178,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
if (cl.hasOption("k")) {
k = Integer.parseInt(cl.getOptionValue("k"));
if (k == 0) {
- throw new UDFArgumentException("`k` must be nonzero: " + k);
+ throw new UDFArgumentException("`k` must be non-zero value: " + k);
}
}
}
-
this.size = Math.abs(k);
- if ((k > 0 && reverseOrder) || (k < 0 && !reverseOrder) || (k == 0 && !reverseOrder)) {
- // reverse top-k, natural tail-k = ascending = natural order output = reverse order priority queue
+ if ((k > 0 && reverseOrder) || (k < 0 && reverseOrder == false)
+ || (k == 0 && reverseOrder == false)) {
+ // top-k on reverse order = tail-k on natural order (so, top-k on descending)
this.reverseOrder = true;
- } else { // (k > 0 && !reverseOrder) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
- // natural top-k or reverse tail-k = descending = reverse order output = natural order priority queue
+ } else { // (k > 0 && reverseOrder == false) || (k < 0 && reverseOrder) || (k == 0 && reverseOrder)
+ // top-k on natural order = tail-k on reverse order (so, top-k on ascending)
this.reverseOrder = false;
}
@@ -190,7 +207,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
|| (argOIs.length == 3 && HiveUtils.isConstString(argOIs[2]));
if (sortByKey) {
- this.valueOI = argOIs[0];
+ this.valueOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
this.keyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[1]);
} else {
// sort values by value itself
@@ -230,20 +247,18 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
return outputOI;
}
+ @Nonnull
private static StructObjectInspector internalMergeOI(@Nonnull ObjectInspector valueOI,
@Nonnull PrimitiveObjectInspector keyOI) {
- ArrayList<String> fieldNames = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ List<String> fieldNames = new ArrayList<String>();
+ List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("valueList");
fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
fieldNames.add("keyList");
fieldOIs.add(ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(keyOI)));
-
fieldNames.add("size");
fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
fieldNames.add("reverseOrder");
fieldOIs.add(PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
@@ -295,10 +310,10 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
throws HiveException {
QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
- Map<String, List<Object>> tuples = myagg.drainQueue();
- List<Object> valueList = tuples.get("value");
- List<Object> keyList = tuples.get("key");
- if (valueList.size() == 0) {
+ Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+ List<Object> keyList = tuples.getKey();
+ List<Object> valueList = tuples.getValue();
+ if (valueList.isEmpty()) {
return null;
}
@@ -307,7 +322,6 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
partialResult[1] = keyList;
partialResult[2] = new IntWritable(myagg.size);
partialResult[3] = new BooleanWritable(myagg.reverseOrder);
-
return partialResult;
}
@@ -345,11 +359,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
@Override
- public Object terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ public List<Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
QueueAggregationBuffer myagg = (QueueAggregationBuffer) agg;
- Map<String, List<Object>> tuples = myagg.drainQueue();
- return tuples.get("value");
+ Pair<List<Object>, List<Object>> tuples = myagg.drainQueue();
+ return tuples.getValue();
}
static class QueueAggregationBuffer extends AbstractAggregationBuffer {
@@ -374,14 +388,14 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
this.reverseOrder = reverseOrder;
}
- void iterate(TupleWithKey tuple) {
+ void iterate(@Nonnull TupleWithKey tuple) {
if (queueHandler == null) {
initQueueHandler();
}
queueHandler.offer(tuple);
}
- void merge(List<Object> o_keyList, List<Object> o_valueList) {
+ void merge(@Nonnull List<Object> o_keyList, @Nonnull List<Object> o_valueList) {
if (queueHandler == null) {
initQueueHandler();
}
@@ -391,7 +405,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
@Nonnull
- Map<String, List<Object>> drainQueue() {
+ Pair<List<Object>, List<Object>> drainQueue() {
int n = queueHandler.size();
final Object[] keys = new Object[n];
final Object[] values = new Object[n];
@@ -402,10 +416,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
queueHandler.clear();
- Map<String, List<Object>> res = new HashMap<String, List<Object>>();
- res.put("key", Arrays.asList(keys));
- res.put("value", Arrays.asList(values));
- return res;
+ return Pair.of(Arrays.asList(keys), Arrays.asList(values));
}
private void initQueueHandler() {
@@ -413,12 +424,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
if (reverseOrder) {
comparator = Collections.reverseOrder();
} else {
- comparator = new Comparator<TupleWithKey>() {
- @Override
- public int compare(TupleWithKey o1, TupleWithKey o2) {
- return o1.compareTo(o2);
- }
- };
+ comparator = NaturalComparator.getInstance();
}
if (size > 0) {
@@ -436,10 +442,11 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
*/
private static abstract class AbstractQueueHandler {
- abstract void offer(TupleWithKey tuple);
+ abstract void offer(@Nonnull TupleWithKey tuple);
abstract int size();
+ @Nullable
abstract TupleWithKey poll();
abstract void clear();
@@ -450,6 +457,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
private static final int DEFAULT_INITIAL_CAPACITY = 11; // same as PriorityQueue
+ @Nonnull
private final PriorityQueue<TupleWithKey> queue;
QueueHandler(@Nonnull Comparator<TupleWithKey> comparator) {
@@ -480,6 +488,7 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
private static final class BoundedQueueHandler extends AbstractQueueHandler {
+ @Nonnull
private final BoundedPriorityQueue<TupleWithKey> queue;
BoundedQueueHandler(int size, @Nonnull Comparator<TupleWithKey> comparator) {
@@ -509,24 +518,29 @@ public class UDAFToOrderedList extends AbstractGenericUDAFResolver {
}
private static final class TupleWithKey implements Comparable<TupleWithKey> {
- private Object key;
- private Object value;
+ @Nonnull
+ private final Object key;
+ @Nonnull
+ private final Object value;
- TupleWithKey(Object key, Object value) {
- this.key = key;
- this.value = value;
+ TupleWithKey(@CheckForNull Object key, @CheckForNull Object value) {
+ this.key = Preconditions.checkNotNull(key);
+ this.value = Preconditions.checkNotNull(value);
}
+ @Nonnull
Object getKey() {
return key;
}
+ @Nonnull
Object getValue() {
return value;
}
@Override
public int compareTo(TupleWithKey o) {
+ @SuppressWarnings("unchecked")
Comparable<? super Object> k = (Comparable<? super Object>) key;
return k.compareTo(o.getKey());
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index 3e6caa4..a6b547f 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -18,14 +18,14 @@
*/
package hivemall.tools.map;
+import hivemall.utils.collections.maps.BoundedSortedMap;
import hivemall.utils.hadoop.HiveUtils;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Map;
-import java.util.SortedMap;
import java.util.TreeMap;
+import javax.annotation.Nonnegative;
+
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
@@ -34,17 +34,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.IntWritable;
-
-import javax.annotation.Nonnull;
/**
* Convert two aggregated columns into a sorted key-value map.
@@ -52,13 +42,13 @@ import javax.annotation.Nonnull;
@Description(name = "to_ordered_map",
value = "_FUNC_(key, value [, const int k|const boolean reverseOrder=false]) "
+ "- Convert two aggregated columns into an ordered key-value map")
-public class UDAFToOrderedMap extends UDAFToMap {
+public final class UDAFToOrderedMap extends UDAFToMap {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
throws SemanticException {
@SuppressWarnings("deprecation")
- TypeInfo[] typeInfo = info.getParameters();
+ final TypeInfo[] typeInfo = info.getParameters();
if (typeInfo.length != 2 && typeInfo.length != 3) {
throw new UDFArgumentTypeException(typeInfo.length - 1,
"Expecting two or three arguments: " + typeInfo.length);
@@ -81,6 +71,7 @@ public class UDAFToOrderedMap extends UDAFToMap {
throw new UDFArgumentException("Map size must be nonzero: " + size);
}
reverseOrder = (size > 0); // positive size => top-k
+ size = Math.abs(size);
} else {
throw new UDFArgumentTypeException(2,
"The third argument must be boolean or integer type: "
@@ -89,205 +80,53 @@ public class UDAFToOrderedMap extends UDAFToMap {
}
if (reverseOrder) { // descending
- if (size != 0) {
- return new TopKOrderedMapEvaluator();
- }
- return new ReverseOrderedMapEvaluator();
+ return new DescendingMapEvaluator(size);
} else { // ascending
- if (size != 0) {
- return new TailKOrderedMapEvaluator();
- }
- return new NaturalOrderedMapEvaluator();
+ return new AscendingMapEvaluator(size);
}
}
- public static class NaturalOrderedMapEvaluator extends UDAFToMapEvaluator {
-
- @Override
- public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
- }
-
- }
+ public static final class AscendingMapEvaluator extends UDAFToMapEvaluator {
- public static class ReverseOrderedMapEvaluator extends UDAFToMapEvaluator {
+ private final int size;
- @Override
- public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
- Collections.reverseOrder());
- }
-
- }
-
- public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
-
- protected PrimitiveObjectInspector inputKeyOI;
- protected ObjectInspector inputValueOI;
- protected StandardMapObjectInspector partialMapOI;
- protected PrimitiveObjectInspector sizeOI;
-
- protected StructObjectInspector internalMergeOI;
-
- protected StructField partialMapField;
- protected StructField sizeField;
-
- @Override
- public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
- super.init(mode, argOIs);
-
- // initialize input
- if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
- this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
- this.inputValueOI = argOIs[1];
- this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
- } else {// from partial aggregation
- StructObjectInspector soi = (StructObjectInspector) argOIs[0];
- this.internalMergeOI = soi;
-
- this.partialMapField = soi.getStructFieldRef("partialMap");
- // re-extract input key/value OIs
- StandardMapObjectInspector partialMapOI = (StandardMapObjectInspector) partialMapField.getFieldObjectInspector();
- this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
- this.inputValueOI = partialMapOI.getMapValueObjectInspector();
-
- this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
- ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
- ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
-
- this.sizeField = soi.getStructFieldRef("size");
- this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
- }
-
- // initialize output
- final ObjectInspector outputOI;
- if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
- outputOI = internalMergeOI(inputKeyOI, inputValueOI);
- } else {// terminate
- outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
- ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
- ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
- }
- return outputOI;
- }
-
- private static StructObjectInspector internalMergeOI(
- @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
- ArrayList<String> fieldNames = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-
- fieldNames.add("partialMap");
- fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
- ObjectInspectorUtils.getStandardObjectInspector(keyOI),
- ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
-
- fieldNames.add("size");
- fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
-
- return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
- }
-
- static class MapAggregationBuffer extends AbstractAggregationBuffer {
- Map<Object, Object> container;
- int size;
-
- MapAggregationBuffer() {
- super();
- }
+ AscendingMapEvaluator(@Nonnegative int size) {
+ super();
+ this.size = size;
}
@Override
public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- myagg.container = new TreeMap<Object, Object>(Collections.reverseOrder());
- myagg.size = Integer.MAX_VALUE;
- }
-
- @Override
- public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
- MapAggregationBuffer myagg = new MapAggregationBuffer();
- reset(myagg);
- return myagg;
- }
-
- @Override
- public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
- Object[] parameters) throws HiveException {
- assert (parameters.length == 3);
-
- if (parameters[0] == null) {
- return;
+ if (size == 0) {
+ ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
+ } else {
+ ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size);
}
-
- Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
- Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
- int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
-
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- myagg.container.put(key, value);
- myagg.size = size;
}
- @Override
- public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
-
- Object[] partialResult = new Object[2];
- partialResult[0] = myagg.container;
- partialResult[1] = new IntWritable(myagg.size);
-
- return partialResult;
- }
+ }
- @Override
- public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
- throws HiveException {
- if (partial == null) {
- return;
- }
+ public static final class DescendingMapEvaluator extends UDAFToMapEvaluator {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ private final int size;
- Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
- Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
- for (Map.Entry<?, ?> e : partialMap.entrySet()) {
- Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
- Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
- myagg.container.put(key, value);
- }
-
- Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
- int size = HiveUtils.getInt(sizeObj, sizeOI);
- myagg.size = size;
+ DescendingMapEvaluator(int size) {
+ super();
+ this.size = size;
}
@Override
- public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
- throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- if (myagg.size < myagg.container.size()) {
- Object toKey = myagg.container.keySet().toArray()[myagg.size];
- return ((SortedMap<Object, Object>) myagg.container).headMap(toKey);
- }
- return myagg.container;
- }
-
- }
-
- public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
-
- @Override
public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
- MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
- myagg.container = new TreeMap<Object, Object>();
- myagg.size = Integer.MAX_VALUE;
+ if (size == 0) {
+ ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
+ Collections.reverseOrder());
+ } else {
+ ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size,
+ true);
+ }
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
index f466dbc..c3039d1 100644
--- a/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
+++ b/core/src/test/java/hivemall/tools/list/UDAFToOrderedListTest.java
@@ -21,22 +21,20 @@ package hivemall.tools.list;
import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator;
import hivemall.tools.list.UDAFToOrderedList.UDAFToOrderedListEvaluator.QueueAggregationBuffer;
+import java.util.List;
+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.List;
-
-@SuppressWarnings("deprecation")
public class UDAFToOrderedListTest {
- GenericUDAFEvaluator evaluator;
- QueueAggregationBuffer agg;
+ private UDAFToOrderedListEvaluator evaluator;
+ private QueueAggregationBuffer agg;
@Before
public void setUp() throws Exception {
@@ -57,7 +55,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(3, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -81,7 +79,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(3, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -105,7 +103,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -129,7 +127,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -152,7 +150,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -176,7 +174,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -199,7 +197,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(3, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -257,7 +255,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
@@ -283,7 +281,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -308,7 +306,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("apple", res.get(0));
@@ -334,7 +332,7 @@ public class UDAFToOrderedListTest {
evaluator.iterate(agg, new Object[] {values[i], keys[i]});
}
- List<Object> res = (List<Object>) evaluator.terminate(agg);
+ List<Object> res = evaluator.terminate(agg);
Assert.assertEquals(2, res.size());
Assert.assertEquals("candy", res.get(0));
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/69730f65/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
index 9289a02..61642f1 100644
--- a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -18,27 +18,23 @@
*/
package hivemall.tools.map;
-import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.AscendingMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.DescendingMapEvaluator;
+
+import java.util.SortedMap;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-
import org.junit.Assert;
import org.junit.Test;
-import java.util.SortedMap;
-
-@SuppressWarnings("deprecation")
public class UDAFToOrderedMapTest {
@Test
public void testNaturalOrder() throws Exception {
- GenericUDAFEvaluator evaluator = new NaturalOrderedMapEvaluator();
- NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ AscendingMapEvaluator evaluator = new AscendingMapEvaluator(0);
+ AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -61,12 +57,14 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals("apple", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
Assert.assertEquals("candy", sortedValues[2]);
+
+ evaluator.close();
}
@Test
public void testReverseOrder() throws Exception {
- GenericUDAFEvaluator evaluator = new ReverseOrderedMapEvaluator();
- ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ DescendingMapEvaluator evaluator = new DescendingMapEvaluator(0);
+ DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -90,12 +88,15 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals("candy", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
Assert.assertEquals("apple", sortedValues[2]);
+
+ evaluator.close();
}
@Test
public void testTopK() throws Exception {
- GenericUDAFEvaluator evaluator = new TopKOrderedMapEvaluator();
- TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ int size = 2;
+ DescendingMapEvaluator evaluator = new DescendingMapEvaluator(size);
+ DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -104,7 +105,6 @@ public class UDAFToOrderedMapTest {
final double[] keys = new double[] {0.7, 0.5, 0.8};
final String[] values = new String[] {"banana", "apple", "candy"};
- int size = 2;
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
evaluator.reset(agg);
@@ -119,12 +119,15 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals(size, sortedValues.length);
Assert.assertEquals("candy", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
+
+ evaluator.close();
}
@Test
public void testTailK() throws Exception {
- GenericUDAFEvaluator evaluator = new TailKOrderedMapEvaluator();
- TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ int size = -2;
+ AscendingMapEvaluator evaluator = new AscendingMapEvaluator(Math.abs(size));
+ AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -133,7 +136,6 @@ public class UDAFToOrderedMapTest {
final double[] keys = new double[] {0.7, 0.5, 0.8};
final String[] values = new String[] {"banana", "apple", "candy"};
- int size = -2;
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
evaluator.reset(agg);
@@ -148,6 +150,8 @@ public class UDAFToOrderedMapTest {
Assert.assertEquals(Math.abs(size), sortedValues.length);
Assert.assertEquals("apple", sortedValues[0]);
Assert.assertEquals("banana", sortedValues[1]);
+
+ evaluator.close();
}
}
[3/5] incubator-hivemall git commit: Added utility classes and methods
Posted by my...@apache.org.
Added utility classes and methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/21a87814
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/21a87814
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/21a87814
Branch: refs/heads/master
Commit: 21a87814895c9c63479a049387dd53b34cc5c56e
Parents: 44528a8
Author: Makoto Yui <my...@apache.org>
Authored: Tue Sep 12 19:24:25 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Tue Sep 12 19:24:25 2017 +0900
----------------------------------------------------------------------
.../collections/maps/BoundedSortedMap.java | 59 ++++++++++
.../hivemall/utils/lang/NaturalComparator.java | 48 ++++++++
.../java/hivemall/utils/lang/StringUtils.java | 17 +++
.../main/java/hivemall/utils/struct/Pair.java | 38 +++++++
.../collections/BoundedPriorityQueueTest.java | 114 +++++++++++++++++++
.../collections/maps/BoundedSortedMapTest.java | 84 ++++++++++++++
6 files changed, 360 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java b/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java
new file mode 100644
index 0000000..b1bf806
--- /dev/null
+++ b/core/src/main/java/hivemall/utils/collections/maps/BoundedSortedMap.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.collections.maps;
+
+import hivemall.utils.lang.Preconditions;
+
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nullable;
+
+public final class BoundedSortedMap<K, V> extends TreeMap<K, V> {
+ private static final long serialVersionUID = 4580890152997313541L;
+
+ private final int bound;
+
+ public BoundedSortedMap(@Nonnegative int size) {
+ this(size, false);
+ }
+
+ public BoundedSortedMap(@Nonnegative int size, boolean reverseOrder) {
+ super(reverseOrder ? Collections.reverseOrder() : null);
+ Preconditions.checkArgument(size > 0, "size must be greater than zero: " + size);
+ this.bound = size;
+ }
+
+ @Nullable
+ public V put(@CheckForNull final K key, @Nullable final V value) {
+ final V old = super.put(key, value);
+ if (size() > bound) {
+ Entry<K, V> e = pollLastEntry();
+ if (e == null) {
+ return null;
+ }
+ return e.getValue();
+ }
+ return old;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/lang/NaturalComparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/lang/NaturalComparator.java b/core/src/main/java/hivemall/utils/lang/NaturalComparator.java
new file mode 100644
index 0000000..d451f1b
--- /dev/null
+++ b/core/src/main/java/hivemall/utils/lang/NaturalComparator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.lang;
+
+import java.util.Comparator;
+
+import javax.annotation.Nonnull;
+
+public final class NaturalComparator<T extends Comparable<? super T>> implements Comparator<T> {
+
+ @SuppressWarnings("rawtypes")
+ private final static NaturalComparator INSTANCE = new NaturalComparator();
+
+ private NaturalComparator() {}// avoid instantiation
+
+ @Override
+ public int compare(T o1, T o2) {
+ return o1.compareTo(o2);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Nonnull
+ public final static <T extends Comparable<? super T>> Comparator<T> getInstance() {
+ return (Comparator<T>) INSTANCE;
+ }
+
+ @Nonnull
+ public final static <T extends Comparable<? super T>> Comparator<T> newInstance() {
+ return new NaturalComparator<T>();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/lang/StringUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/lang/StringUtils.java b/core/src/main/java/hivemall/utils/lang/StringUtils.java
index 48e137f..5b66dd1 100644
--- a/core/src/main/java/hivemall/utils/lang/StringUtils.java
+++ b/core/src/main/java/hivemall/utils/lang/StringUtils.java
@@ -250,5 +250,22 @@ public final class StringUtils {
return builder.toString();
}
+ public static int compare(@Nullable final String o1, @Nullable final String o2) {
+ return compare(o1, o2, true);
+ }
+
+ public static int compare(@Nullable final String o1, @Nullable final String o2,
+ final boolean nullIsLess) {
+ if (o1 == o2) {
+ return 0;
+ }
+ if (o1 == null) {
+ return nullIsLess ? -1 : 1;
+ }
+ if (o2 == null) {
+ return nullIsLess ? 1 : -1;
+ }
+ return o1.compareTo(o2);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/main/java/hivemall/utils/struct/Pair.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/struct/Pair.java b/core/src/main/java/hivemall/utils/struct/Pair.java
new file mode 100644
index 0000000..17737ed
--- /dev/null
+++ b/core/src/main/java/hivemall/utils/struct/Pair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.struct;
+
+import java.util.AbstractMap;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+public class Pair<K, V> extends AbstractMap.SimpleEntry<K, V> {
+ private static final long serialVersionUID = 6411527075103472113L;
+
+ public Pair(@Nullable K key, @Nullable V value) {
+ super(key, value);
+ }
+
+ @Nonnull
+ public static <K, V> Pair<K, V> of(@Nullable K key, @Nullable V value) {
+ return new Pair<>(key, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java b/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java
new file mode 100644
index 0000000..1220d76
--- /dev/null
+++ b/core/src/test/java/hivemall/utils/collections/BoundedPriorityQueueTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.collections;
+
+import hivemall.utils.lang.NaturalComparator;
+import hivemall.utils.lang.StringUtils;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BoundedPriorityQueueTest {
+
+ @Test
+ public void testTop3() {
+ BoundedPriorityQueue<Integer> queue = new BoundedPriorityQueue<Integer>(3,
+ new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return Integer.compare(o1, o2);
+ }
+ });
+ Assert.assertTrue(queue.offer(1));
+ Assert.assertTrue(queue.offer(4));
+ Assert.assertTrue(queue.offer(3));
+ Assert.assertTrue(queue.offer(2));
+ Assert.assertFalse(queue.offer(1));
+ Assert.assertTrue(queue.offer(2));
+ Assert.assertTrue(queue.offer(3));
+
+ Assert.assertEquals(3, queue.size());
+
+ Assert.assertEquals(Integer.valueOf(3), queue.peek());
+ Assert.assertEquals(Integer.valueOf(3), queue.poll());
+ Assert.assertEquals(Integer.valueOf(3), queue.poll());
+ Assert.assertEquals(Integer.valueOf(4), queue.poll());
+ Assert.assertNull(queue.poll());
+ Assert.assertEquals(0, queue.size());
+ }
+
+ @Test
+ public void testTail3() {
+ BoundedPriorityQueue<Integer> queue = new BoundedPriorityQueue<Integer>(3,
+ Collections.<Integer>reverseOrder());
+ Assert.assertTrue(queue.offer(1));
+ Assert.assertTrue(queue.offer(4));
+ Assert.assertTrue(queue.offer(3));
+ Assert.assertTrue(queue.offer(2));
+ Assert.assertTrue(queue.offer(1));
+ Assert.assertTrue(queue.offer(2));
+ Assert.assertFalse(queue.offer(3));
+
+ Assert.assertEquals(3, queue.size());
+
+ Assert.assertEquals(Integer.valueOf(2), queue.peek());
+ Assert.assertEquals(Integer.valueOf(2), queue.poll());
+ Assert.assertEquals(Integer.valueOf(1), queue.poll());
+ Assert.assertEquals(Integer.valueOf(1), queue.poll());
+ Assert.assertNull(queue.poll());
+ Assert.assertEquals(0, queue.size());
+ }
+
+ @Test
+ public void testString1() {
+ BoundedPriorityQueue<String> queue = new BoundedPriorityQueue<>(3,
+ new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return StringUtils.compare(o1, o2);
+ }
+ });
+ queue.offer("B");
+ queue.offer("A");
+ queue.offer("C");
+ queue.offer("D");
+ Assert.assertEquals("B", queue.poll());
+ Assert.assertEquals("C", queue.poll());
+ Assert.assertEquals("D", queue.poll());
+ Assert.assertNull(queue.poll());
+ }
+
+ @Test
+ public void testString2() {
+ BoundedPriorityQueue<String> queue = new BoundedPriorityQueue<>(3,
+ NaturalComparator.<String>getInstance());
+ queue.offer("B");
+ queue.offer("A");
+ queue.offer("C");
+ queue.offer("D");
+ Assert.assertEquals("B", queue.poll());
+ Assert.assertEquals("C", queue.poll());
+ Assert.assertEquals("D", queue.poll());
+ Assert.assertNull(queue.poll());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/21a87814/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java b/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java
new file mode 100644
index 0000000..ce376cf
--- /dev/null
+++ b/core/src/test/java/hivemall/utils/collections/maps/BoundedSortedMapTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.utils.collections.maps;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BoundedSortedMapTest {
+
+ @Test
+ public void testNaturalOrderTop3() {
+ // natural order = ascending
+ SortedMap<Integer, Double> map = new BoundedSortedMap<Integer, Double>(3);
+ Assert.assertNull(map.put(1, 1.d));
+ Assert.assertEquals(Double.valueOf(1.d), map.put(1, 1.1d));
+ Assert.assertNull(map.put(4, 4.d));
+ Assert.assertNull(map.put(2, 2.d));
+ Assert.assertEquals(Double.valueOf(2.d), map.put(2, 2.2d));
+ Assert.assertEquals(Double.valueOf(4.d), map.put(3, 3.d));
+ Assert.assertEquals(Double.valueOf(3.d), map.put(3, 3.3d));
+
+ Assert.assertEquals(3, map.size());
+
+ Iterator<Entry<Integer, Double>> itor = map.entrySet().iterator();
+ Entry<Integer, Double> e = itor.next();
+ Assert.assertEquals(Integer.valueOf(1), e.getKey());
+ Assert.assertEquals(Double.valueOf(1.1d), e.getValue());
+ e = itor.next();
+ Assert.assertEquals(Integer.valueOf(2), e.getKey());
+ Assert.assertEquals(Double.valueOf(2.2d), e.getValue());
+ e = itor.next();
+ Assert.assertEquals(Integer.valueOf(3), e.getKey());
+ Assert.assertEquals(Double.valueOf(3.3d), e.getValue());
+ Assert.assertFalse(itor.hasNext());
+ }
+
+ @Test
+ public void testReverseOrderTop3() {
+ // reverse order = descending
+ SortedMap<Integer, Double> map = new BoundedSortedMap<Integer, Double>(3, true);
+ Assert.assertNull(map.put(1, 1.d));
+ Assert.assertEquals(Double.valueOf(1.d), map.put(1, 1.1d));
+ Assert.assertNull(map.put(4, 4.d));
+ Assert.assertNull(map.put(2, 2.d));
+ Assert.assertEquals(Double.valueOf(2.d), map.put(2, 2.2d));
+ Assert.assertEquals(Double.valueOf(1.1d), map.put(3, 3.d));
+ Assert.assertEquals(Double.valueOf(3.d), map.put(3, 3.3d));
+
+ Assert.assertEquals(3, map.size());
+
+ Iterator<Entry<Integer, Double>> itor = map.entrySet().iterator();
+ Entry<Integer, Double> e = itor.next();
+ Assert.assertEquals(Integer.valueOf(4), e.getKey());
+ Assert.assertEquals(Double.valueOf(4.d), e.getValue());
+ e = itor.next();
+ Assert.assertEquals(Integer.valueOf(3), e.getKey());
+ Assert.assertEquals(Double.valueOf(3.3d), e.getValue());
+ e = itor.next();
+ Assert.assertEquals(Integer.valueOf(2), e.getKey());
+ Assert.assertEquals(Double.valueOf(2.2d), e.getValue());
+ Assert.assertFalse(itor.hasNext());
+ }
+
+}
[5/5] incubator-hivemall git commit: Close #114: refactored
to_ordered_(map|list) of PR #108
Posted by my...@apache.org.
Close #114: refactored to_ordered_(map|list) of PR #108
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/688daa5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/688daa5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/688daa5f
Branch: refs/heads/master
Commit: 688daa5f8e6a87fad2abf3b47a8a8353bc1c792a
Parents: 69730f6
Author: Makoto Yui <my...@apache.org>
Authored: Wed Sep 13 21:12:41 2017 +0900
Committer: Makoto Yui <my...@apache.org>
Committed: Wed Sep 13 21:12:41 2017 +0900
----------------------------------------------------------------------
.../hivemall/tools/map/UDAFToOrderedMap.java | 240 ++++++++++++++++---
.../java/hivemall/utils/hadoop/HiveUtils.java | 12 +
.../tools/map/UDAFToOrderedMapTest.java | 36 +--
docs/gitbook/misc/generic_funcs.md | 54 ++++-
4 files changed, 292 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
index a6b547f..5cdac4d 100644
--- a/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
+++ b/core/src/main/java/hivemall/tools/map/UDAFToOrderedMap.java
@@ -20,11 +20,17 @@ package hivemall.tools.map;
import hivemall.utils.collections.maps.BoundedSortedMap;
import hivemall.utils.hadoop.HiveUtils;
+import hivemall.utils.lang.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
@@ -33,8 +39,16 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.IntWritable;
/**
* Convert two aggregated columns into a sorted key-value map.
@@ -63,70 +77,232 @@ public final class UDAFToOrderedMap extends UDAFToMap {
int size = 0;
if (typeInfo.length == 3) {
ObjectInspector[] argOIs = info.getParameterObjectInspectors();
- if (HiveUtils.isBooleanTypeInfo(typeInfo[2])) {
- reverseOrder = HiveUtils.getConstBoolean(argOIs[2]);
- } else if (HiveUtils.isIntegerTypeInfo(typeInfo[2])) {
- size = HiveUtils.getConstInt(argOIs[2]);
+ ObjectInspector argOI2 = argOIs[2];
+ if (HiveUtils.isConstBoolean(argOI2)) {
+ reverseOrder = HiveUtils.getConstBoolean(argOI2);
+ } else if (HiveUtils.isConstInteger(argOI2)) {
+ size = HiveUtils.getConstInt(argOI2);
if (size == 0) {
- throw new UDFArgumentException("Map size must be nonzero: " + size);
+ throw new UDFArgumentException("Map size must be non-zero value: " + size);
}
reverseOrder = (size > 0); // positive size => top-k
- size = Math.abs(size);
} else {
throw new UDFArgumentTypeException(2,
- "The third argument must be boolean or integer type: "
- + typeInfo[2].getTypeName());
+ "The third argument must be boolean or int type: " + typeInfo[2].getTypeName());
}
}
if (reverseOrder) { // descending
- return new DescendingMapEvaluator(size);
+ if (size == 0) {
+ return new ReverseOrderedMapEvaluator();
+ } else {
+ return new TopKOrderedMapEvaluator();
+ }
} else { // ascending
- return new AscendingMapEvaluator(size);
+ if (size == 0) {
+ return new NaturalOrderedMapEvaluator();
+ } else {
+ return new TailKOrderedMapEvaluator();
+ }
}
}
- public static final class AscendingMapEvaluator extends UDAFToMapEvaluator {
-
- private final int size;
+ public static class NaturalOrderedMapEvaluator extends UDAFToMapEvaluator {
- AscendingMapEvaluator(@Nonnegative int size) {
- super();
- this.size = size;
+ @Override
+ public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
}
+ }
+
+ public static class ReverseOrderedMapEvaluator extends UDAFToMapEvaluator {
+
@Override
public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
- if (size == 0) {
- ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>();
- } else {
- ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size);
- }
+ ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
+ Collections.reverseOrder());
}
}
- public static final class DescendingMapEvaluator extends UDAFToMapEvaluator {
+ public static class TopKOrderedMapEvaluator extends GenericUDAFEvaluator {
+
+ protected PrimitiveObjectInspector inputKeyOI;
+ protected ObjectInspector inputValueOI;
+ protected MapObjectInspector partialMapOI;
+ protected PrimitiveObjectInspector sizeOI;
+
+ protected StructObjectInspector internalMergeOI;
+
+ protected StructField partialMapField;
+ protected StructField sizeField;
- private final int size;
+ @Override
+ public ObjectInspector init(Mode mode, ObjectInspector[] argOIs) throws HiveException {
+ super.init(mode, argOIs);
+
+ // initialize input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {// from original data
+ this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(argOIs[0]);
+ this.inputValueOI = argOIs[1];
+ this.sizeOI = HiveUtils.asIntegerOI(argOIs[2]);
+ } else {// from partial aggregation
+ StructObjectInspector soi = (StructObjectInspector) argOIs[0];
+ this.internalMergeOI = soi;
+
+ this.partialMapField = soi.getStructFieldRef("partialMap");
+ // re-extract input key/value OIs
+ MapObjectInspector partialMapOI = (MapObjectInspector) partialMapField.getFieldObjectInspector();
+ this.inputKeyOI = HiveUtils.asPrimitiveObjectInspector(partialMapOI.getMapKeyObjectInspector());
+ this.inputValueOI = partialMapOI.getMapValueObjectInspector();
+
+ this.partialMapOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+ ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+ ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+
+ this.sizeField = soi.getStructFieldRef("size");
+ this.sizeOI = (PrimitiveObjectInspector) sizeField.getFieldObjectInspector();
+ }
- DescendingMapEvaluator(int size) {
- super();
- this.size = size;
+ // initialize output
+ final ObjectInspector outputOI;
+ if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {// terminatePartial
+ outputOI = internalMergeOI(inputKeyOI, inputValueOI);
+ } else {// terminate
+ outputOI = ObjectInspectorFactory.getStandardMapObjectInspector(
+ ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI),
+ ObjectInspectorUtils.getStandardObjectInspector(inputValueOI));
+ }
+ return outputOI;
+ }
+
+ @Nonnull
+ private static StructObjectInspector internalMergeOI(
+ @Nonnull PrimitiveObjectInspector keyOI, @Nonnull ObjectInspector valueOI) {
+ List<String> fieldNames = new ArrayList<String>();
+ List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+
+ fieldNames.add("partialMap");
+ fieldOIs.add(ObjectInspectorFactory.getStandardMapObjectInspector(
+ ObjectInspectorUtils.getStandardObjectInspector(keyOI),
+ ObjectInspectorUtils.getStandardObjectInspector(valueOI)));
+
+ fieldNames.add("size");
+ fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+ }
+
+ static class MapAggregationBuffer extends AbstractAggregationBuffer {
+ @Nullable
+ Map<Object, Object> container;
+ int size;
+
+ MapAggregationBuffer() {
+ super();
+ }
}
@Override
public void reset(@SuppressWarnings("deprecation") AggregationBuffer agg)
throws HiveException {
- if (size == 0) {
- ((MapAggregationBuffer) agg).container = new TreeMap<Object, Object>(
- Collections.reverseOrder());
- } else {
- ((MapAggregationBuffer) agg).container = new BoundedSortedMap<Object, Object>(size,
- true);
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ myagg.container = null;
+ myagg.size = 0;
+ }
+
+ @Override
+ public MapAggregationBuffer getNewAggregationBuffer() throws HiveException {
+ MapAggregationBuffer myagg = new MapAggregationBuffer();
+ reset(myagg);
+ return myagg;
+ }
+
+ @Override
+ public void iterate(@SuppressWarnings("deprecation") AggregationBuffer agg,
+ Object[] parameters) throws HiveException {
+ assert (parameters.length == 3);
+ if (parameters[0] == null) {
+ return;
+ }
+
+ Object key = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputKeyOI);
+ Object value = ObjectInspectorUtils.copyToStandardObject(parameters[1], inputValueOI);
+ int size = Math.abs(HiveUtils.getInt(parameters[2], sizeOI)); // size could be negative for tail-k
+
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ if (myagg.container == null) {
+ initBuffer(myagg, size);
+ }
+ myagg.container.put(key, value);
+ }
+
+ void initBuffer(@Nonnull MapAggregationBuffer agg, @Nonnegative int size) {
+ Preconditions.checkArgument(size > 0, "size MUST be greather than zero: " + size);
+
+ agg.container = new BoundedSortedMap<Object, Object>(size, true);
+ agg.size = size;
+ }
+
+ @Override
+ public Object terminatePartial(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+ Object[] partialResult = new Object[2];
+ partialResult[0] = myagg.container;
+ partialResult[1] = new IntWritable(myagg.size);
+
+ return partialResult;
+ }
+
+ @Override
+ public void merge(@SuppressWarnings("deprecation") AggregationBuffer agg, Object partial)
+ throws HiveException {
+ if (partial == null) {
+ return;
+ }
+
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+
+ Object partialMapObj = internalMergeOI.getStructFieldData(partial, partialMapField);
+ Map<?, ?> partialMap = partialMapOI.getMap(HiveUtils.castLazyBinaryObject(partialMapObj));
+ if (partialMap == null) {
+ return;
+ }
+
+ if (myagg.container == null) {
+ Object sizeObj = internalMergeOI.getStructFieldData(partial, sizeField);
+ int size = HiveUtils.getInt(sizeObj, sizeOI);
+ initBuffer(myagg, size);
+ }
+ for (Map.Entry<?, ?> e : partialMap.entrySet()) {
+ Object key = ObjectInspectorUtils.copyToStandardObject(e.getKey(), inputKeyOI);
+ Object value = ObjectInspectorUtils.copyToStandardObject(e.getValue(), inputValueOI);
+ myagg.container.put(key, value);
}
}
+ @Override
+ @Nullable
+ public Map<Object, Object> terminate(@SuppressWarnings("deprecation") AggregationBuffer agg)
+ throws HiveException {
+ MapAggregationBuffer myagg = (MapAggregationBuffer) agg;
+ return myagg.container;
+ }
+
}
+
+ public static class TailKOrderedMapEvaluator extends TopKOrderedMapEvaluator {
+
+ @Override
+ void initBuffer(MapAggregationBuffer agg, int size) {
+ agg.container = new BoundedSortedMap<Object, Object>(size);
+ agg.size = size;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
index afa8a58..8fba349 100644
--- a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
+++ b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
@@ -326,6 +326,18 @@ public final class HiveUtils {
return ObjectInspectorUtils.isConstantObjectInspector(oi) && isStringOI(oi);
}
+ public static boolean isConstInt(@Nonnull final ObjectInspector oi) {
+ return ObjectInspectorUtils.isConstantObjectInspector(oi) && isIntOI(oi);
+ }
+
+ public static boolean isConstInteger(@Nonnull final ObjectInspector oi) {
+ return ObjectInspectorUtils.isConstantObjectInspector(oi) && isIntegerOI(oi);
+ }
+
+ public static boolean isConstBoolean(@Nonnull final ObjectInspector oi) {
+ return ObjectInspectorUtils.isConstantObjectInspector(oi) && isBooleanOI(oi);
+ }
+
public static boolean isPrimitiveTypeInfo(@Nonnull TypeInfo typeInfo) {
return typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE;
}
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
index 61642f1..38bc5ae 100644
--- a/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
+++ b/core/src/test/java/hivemall/tools/map/UDAFToOrderedMapTest.java
@@ -18,10 +18,12 @@
*/
package hivemall.tools.map;
-import hivemall.tools.map.UDAFToOrderedMap.AscendingMapEvaluator;
-import hivemall.tools.map.UDAFToOrderedMap.DescendingMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.NaturalOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.ReverseOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TailKOrderedMapEvaluator;
+import hivemall.tools.map.UDAFToOrderedMap.TopKOrderedMapEvaluator;
-import java.util.SortedMap;
+import java.util.Map;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -33,8 +35,8 @@ public class UDAFToOrderedMapTest {
@Test
public void testNaturalOrder() throws Exception {
- AscendingMapEvaluator evaluator = new AscendingMapEvaluator(0);
- AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ NaturalOrderedMapEvaluator evaluator = new NaturalOrderedMapEvaluator();
+ NaturalOrderedMapEvaluator.MapAggregationBuffer agg = (NaturalOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -50,7 +52,7 @@ public class UDAFToOrderedMapTest {
evaluator.iterate(agg, new Object[] {keys[i], values[i]});
}
- SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Map<Object, Object> res = evaluator.terminate(agg);
Object[] sortedValues = res.values().toArray();
Assert.assertEquals(3, sortedValues.length);
@@ -63,8 +65,8 @@ public class UDAFToOrderedMapTest {
@Test
public void testReverseOrder() throws Exception {
- DescendingMapEvaluator evaluator = new DescendingMapEvaluator(0);
- DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ ReverseOrderedMapEvaluator evaluator = new ReverseOrderedMapEvaluator();
+ ReverseOrderedMapEvaluator.MapAggregationBuffer agg = (ReverseOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -81,7 +83,7 @@ public class UDAFToOrderedMapTest {
evaluator.iterate(agg, new Object[] {keys[i], values[i]});
}
- SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Map<Object, Object> res = evaluator.terminate(agg);
Object[] sortedValues = res.values().toArray();
Assert.assertEquals(3, sortedValues.length);
@@ -94,9 +96,8 @@ public class UDAFToOrderedMapTest {
@Test
public void testTopK() throws Exception {
- int size = 2;
- DescendingMapEvaluator evaluator = new DescendingMapEvaluator(size);
- DescendingMapEvaluator.MapAggregationBuffer agg = (DescendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ TopKOrderedMapEvaluator evaluator = new TopKOrderedMapEvaluator();
+ TopKOrderedMapEvaluator.MapAggregationBuffer agg = (TopKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -105,6 +106,7 @@ public class UDAFToOrderedMapTest {
final double[] keys = new double[] {0.7, 0.5, 0.8};
final String[] values = new String[] {"banana", "apple", "candy"};
+ int size = 2;
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
evaluator.reset(agg);
@@ -113,7 +115,7 @@ public class UDAFToOrderedMapTest {
evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
}
- SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Map<Object, Object> res = evaluator.terminate(agg);
Object[] sortedValues = res.values().toArray();
Assert.assertEquals(size, sortedValues.length);
@@ -125,9 +127,8 @@ public class UDAFToOrderedMapTest {
@Test
public void testTailK() throws Exception {
- int size = -2;
- AscendingMapEvaluator evaluator = new AscendingMapEvaluator(Math.abs(size));
- AscendingMapEvaluator.MapAggregationBuffer agg = (AscendingMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
+ TailKOrderedMapEvaluator evaluator = new TailKOrderedMapEvaluator();
+ TailKOrderedMapEvaluator.MapAggregationBuffer agg = (TailKOrderedMapEvaluator.MapAggregationBuffer) evaluator.getNewAggregationBuffer();
ObjectInspector[] inputOIs = new ObjectInspector[] {
PrimitiveObjectInspectorFactory.javaDoubleObjectInspector,
@@ -136,6 +137,7 @@ public class UDAFToOrderedMapTest {
final double[] keys = new double[] {0.7, 0.5, 0.8};
final String[] values = new String[] {"banana", "apple", "candy"};
+ int size = -2;
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputOIs);
evaluator.reset(agg);
@@ -144,7 +146,7 @@ public class UDAFToOrderedMapTest {
evaluator.iterate(agg, new Object[] {keys[i], values[i], size});
}
- SortedMap<Object, Object> res = (SortedMap<Object, Object>) evaluator.terminate(agg);
+ Map<Object, Object> res = evaluator.terminate(agg);
Object[] sortedValues = res.values().toArray();
Assert.assertEquals(Math.abs(size), sortedValues.length);
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/688daa5f/docs/gitbook/misc/generic_funcs.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/misc/generic_funcs.md b/docs/gitbook/misc/generic_funcs.md
index 66b30e2..03e1ef3 100644
--- a/docs/gitbook/misc/generic_funcs.md
+++ b/docs/gitbook/misc/generic_funcs.md
@@ -85,7 +85,36 @@ This page describes a list of useful Hivemall generic functions.
## List UDAF
-- `to_ordered_list(value [, const string options])` or `to_ordered_list(value, key [, const string options])` - Return list of values sorted by value itself or specific key
+- `to_ordered_list(PRIMITIVE value [, PRIMITIVE key, const string options])` or `to_ordered_list(value, key [, const string options])` - Return list of values sorted by value itself or specific key
+
+ ```sql
+ with t as (
+ select 5 as key, 'apple' as value
+ union all
+ select 3 as key, 'banana' as value
+ union all
+ select 4 as key, 'candy' as value
+ union all
+ select 2 as key, 'donut' as value
+ union all
+ select 3 as key, 'egg' as value
+ )
+ select -- expected output
+ to_ordered_list(value, key, '-reverse'), -- [apple, candy, (banana, egg | egg, banana), donut] (reverse order)
+ to_ordered_list(value, key, '-k 2'), -- [apple, candy] (top-k)
+ to_ordered_list(value, key, '-k 100'), -- [apple, candy, (banana, egg | egg, banana), dunut]
+ to_ordered_list(value, key, '-k 2 -reverse'), -- [donut, (banana | egg)] (reverse top-k = tail-k)
+ to_ordered_list(value, key), -- [donut, (banana, egg | egg, banana), candy, apple] (natural order)
+ to_ordered_list(value, key, '-k -2'), -- [donut, (banana | egg)] (tail-k)
+ to_ordered_list(value, key, '-k -100'), -- [donut, (banana, egg | egg, banana), candy, apple]
+ to_ordered_list(value, key, '-k -2 -reverse'), -- [apple, candy] (reverse tail-k = top-k)
+ to_ordered_list(value, '-k 2'), -- [egg, donut] (alphabetically)
+ to_ordered_list(key, '-k -2 -reverse'), -- [5, 4] (top-2 keys)
+ to_ordered_list(key) -- [2, 3, 3, 4, 5] (natural ordered keys)
+ from
+ t
+ ;
+ ```
# Bitset functions
@@ -147,6 +176,29 @@ The compression level must be in range [-1,9]
- `to_ordered_map(key, value [, const int k|const boolean reverseOrder=false])` - Convert two aggregated columns into an ordered key-value map
+ ```sql
+ with t as (
+ select 10 as key, 'apple' as value
+ union all
+ select 3 as key, 'banana' as value
+ union all
+ select 4 as key, 'candy' as value
+ )
+ select
+ to_ordered_map(key, value, true), -- {10:"apple",4:"candy",3:"banana"} (reverse)
+ to_ordered_map(key, value, 1), -- {10:"apple"} (top-1)
+ to_ordered_map(key, value, 2), -- {10:"apple",4:"candy"} (top-2)
+ to_ordered_map(key, value, 3), -- {10:"apple",4:"candy",3:"banana"} (top-3)
+ to_ordered_map(key, value, 100), -- {10:"apple",4:"candy",3:"banana"} (top-100)
+ to_ordered_map(key, value), -- {3:"banana",4:"candy",10:"apple"} (natural)
+ to_ordered_map(key, value, -1), -- {3:"banana"} (tail-1)
+ to_ordered_map(key, value, -2), -- {3:"banana",4:"candy"} (tail-2)
+ to_ordered_map(key, value, -3), -- {3:"banana",4:"candy",10:"apple"} (tail-3)
+ to_ordered_map(key, value, -100) -- {3:"banana",4:"candy",10:"apple"} (tail-100)
+ from t
+ ;
+ ```
+
# MapReduce functions
- `rowid()` - Returns a generated row id of a form {TASK_ID}-{SEQUENCE_NUMBER}