You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hivemall.apache.org by takuti <gi...@git.apache.org> on 2017/06/02 22:29:52 UTC

[GitHub] incubator-hivemall pull request #84: [WIP][HIVEMALL-19] Support DIMSUM for a...

GitHub user takuti opened a pull request:

    https://github.com/apache/incubator-hivemall/pull/84

    [WIP][HIVEMALL-19] Support DIMSUM for approx. all-pairs similarity

    ## What changes were proposed in this pull request?
    
    Support DIMSUM, Dimension Independent Matrix Square using MapReduce, for approximated all-pairs similarity computation. It makes item-based CF more efficient.
    
    https://stanford.edu/~rezab/papers/dimsum.pdf
    
    ## What type of PR is it?
    
    Feature
    
    ## What is the Jira issue?
    
    - https://issues.apache.org/jira/browse/HIVEMALL-19
    
    ## How was this patch tested?
    
    - Unit tests
    - Manual tests on EMR
    
    ---
    
    ### TODO
    
    - [ ] Documentation
    - [ ] Evaluate on larger data e.g. MovieLens

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/takuti/incubator-hivemall DIMSUM

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-hivemall/pull/84.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #84
    
----
commit 1a661cef229a508655352c360a2890bd66da1ab0
Author: Takuya Kitazawa <k....@gmail.com>
Date:   2017-06-01T03:30:08Z

    Add `l2_norm` UDAF

commit c19abc5b8e603b65595346c6fb76329a09a1e02c
Author: Takuya Kitazawa <k....@gmail.com>
Date:   2017-06-01T09:10:16Z

    Implement DIMSUM mapper

commit 44367b29056752b32bbbd9601e9500fa6398e8ef
Author: Takuya Kitazawa <k....@gmail.com>
Date:   2017-06-02T01:58:40Z

    Make symmetric output (j, k), (k, j) configureable

commit a6e854c856ce3deef46e6b8b0293497d57e82901
Author: Takuya Kitazawa <k....@gmail.com>
Date:   2017-06-02T03:16:23Z

    Support string feature

commit 97cb91d8fef0cd2f85657a02bd9a2505d7551337
Author: Takuya Kitazawa <k....@gmail.com>
Date:   2017-06-02T03:28:22Z

    Fix so that default `gamma` is computed correctly

commit b42b65b1cb358a89cd402f90b5ec3d6c79ff465c
Author: Takuya Kitazawa <k....@gmail.com>
Date:   2017-06-02T07:04:25Z

    Add unit test for DIMSUMMapperUDTF

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    
    [![Coverage Status](https://coveralls.io/builds/11842859/badge)](https://coveralls.io/builds/11842859)
    
    Coverage increased (+0.4%) to 39.107% when pulling **c4c4e8c67b93a572f4ee00fd80b5265079f803a9 on takuti:DIMSUM** into **10e7d450fa8257efc5d614957fda514b2b91fdee on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-hivemall/pull/84


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    
    [![Coverage Status](https://coveralls.io/builds/11828591/badge)](https://coveralls.io/builds/11828591)
    
    Coverage increased (+0.4%) to 39.105% when pulling **e96d860de3f11f31dae9cb8171658e4d0be7482b on takuti:DIMSUM** into **10e7d450fa8257efc5d614957fda514b2b91fdee on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by myui <gi...@git.apache.org>.
Github user myui commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r120307947
  
    --- Diff: core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.knn.similarity;
    +
    +import hivemall.UDTFWithOptions;
    +import hivemall.fm.Feature;
    +import hivemall.fm.IntFeature;
    +import hivemall.fm.StringFeature;
    +import hivemall.math.random.PRNG;
    +import hivemall.math.random.RandomNumberGeneratorFactory;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Primitives;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.Text;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Description(
    +        name = "dimsum_mapper",
    +        value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) "
    +                + "- Returns column-wise partial similarities")
    +public class DIMSUMMapperUDTF extends UDTFWithOptions {
    +    private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class);
    +
    +    protected ListObjectInspector rowOI;
    +    protected MapObjectInspector colNormsOI;
    +
    +    @Nullable
    +    protected Feature[] probes;
    +
    +    @Nonnull
    +    protected PRNG rnd;
    +
    +    protected double threshold;
    +    protected double sqrtGamma;
    +    protected boolean symmetricOutput;
    +    protected boolean parseFeatureAsInt;
    +
    +    protected Map<Object, Double> colNorms;
    +    protected Map<Object, Double> colProbs;
    +
    +    @Override
    +    protected Options getOptions() {
    +        Options opts = new Options();
    +        opts.addOption("th", "threshold", true,
    +            "Theoretically, similarities above this threshold are estimated [default: 0.5]");
    +        opts.addOption("g", "gamma", true,
    +            "Oversampling parameter; if `gamma` is given, `threshold` will be ignored"
    +                + " [default: 10 * log(numCols) / threshold]");
    +        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
    +            "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted");
    +        opts.addOption("int_feature", "feature_as_integer", false,
    +            "Parse a feature (i.e. column ID) as integer");
    +        return opts;
    +    }
    +
    +    @Override
    +    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +            throws UDFArgumentException {
    +        double threshold = 0.5d;
    +        double gamma = Double.POSITIVE_INFINITY;
    +        boolean symmetricOutput = true;
    +        boolean parseFeatureAsInt = false;
    +
    +        CommandLine cl = null;
    +        if (argOIs.length >= 3) {
    +            String rawArgs = HiveUtils.getConstString(argOIs[2]);
    +            cl = parseOptions(rawArgs);
    +            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold);
    +            if (threshold < 0.f || threshold >= 1.f) {
    +                throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold);
    +            }
    +            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
    +            if (gamma <= 1.d) {
    +                throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma);
    +            }
    +            symmetricOutput = !cl.hasOption("disable_symmetric_output");
    +            parseFeatureAsInt = cl.hasOption("feature_as_integer");
    +        }
    +
    +        this.threshold = threshold;
    +        this.sqrtGamma = Math.sqrt(gamma);
    +        this.symmetricOutput = symmetricOutput;
    +        this.parseFeatureAsInt = parseFeatureAsInt;
    +
    +        return cl;
    +    }
    +
    +    @Override
    +    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    +        if (argOIs.length != 2 && argOIs.length != 3) {
    +            throw new UDFArgumentException(
    +                getClass().getSimpleName()
    +                        + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms "
    +                        + "[, CONSTANT STRING options]: "
    +                        + Arrays.toString(argOIs));
    +        }
    +
    +        this.rowOI = HiveUtils.asListOI(argOIs[0]);
    +        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
    +
    +        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
    +
    +        processOptions(argOIs);
    +
    +        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
    +        this.colNorms = null;
    +        this.colProbs = null;
    +
    +        ArrayList<String> fieldNames = new ArrayList<String>();
    +        fieldNames.add("j");
    +        fieldNames.add("k");
    +        fieldNames.add("b_jk");
    +
    +        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    +        if (parseFeatureAsInt) {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +        } else {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +        }
    +        fieldOIs.add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
    +
    +        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    +    }
    +
    +    @Override
    +    public void process(Object[] args) throws HiveException {
    +        Feature[] row = parseFeatures(args[0]);
    +        if (row == null) {
    +            return;
    +        }
    +        this.probes = row;
    +
    +        // since the 2nd argument (column norms) is consistent,
    +        // column-related values, `colNorms` and `colProbs`, should be cached
    +        if (colNorms == null || colProbs == null) {
    +            final int numCols = colNormsOI.getMapSize(args[1]);
    +
    +            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold`
    +                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs
    +                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold);
    +                }
    +            }
    +
    +            this.colNorms = new HashMap<Object, Double>(numCols);
    +            this.colProbs = new HashMap<Object, Double>(numCols);
    +            final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]);
    +            for (Map.Entry<Object, Object> e : m.entrySet()) {
    +                Object j = e.getKey();
    +                if (parseFeatureAsInt) {
    +                    j = HiveUtils.asJavaInt(j);
    +                } else {
    +                    j = j.toString();
    +                }
    +
    +                double norm = HiveUtils.asJavaDouble(e.getValue());
    +                if (norm == 0.d) { // avoid zero-division
    +                    norm = 1.d;
    +                }
    +
    +                colNorms.put(j, norm);
    +
    +                double p = Math.min(1.d, sqrtGamma / norm);
    +                colProbs.put(j, p);
    +            }
    +        }
    +
    +        if (parseFeatureAsInt) {
    +            forwardAsIntFeature(row);
    +        } else {
    +            forwardAsStringFeature(row);
    +        }
    +    }
    +
    +    private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            int j = row[i].getFeatureIndex();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new IntFeature(j, scaled);
    +        }
    +
    +        final IntWritable jWritable = new IntWritable();
    +        final IntWritable kWritable = new IntWritable();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            int j = rowScaled[ij].getFeatureIndex();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    +                for (int ik = ij + 1; ik < length; ik++) {
    +                    int k = rowScaled[ik].getFeatureIndex();
    +                    double kVal = rowScaled[ik].getValue();
    +
    +                    if (kVal != 0.d && rnd.nextDouble() < colProbs.get(k)) {
    +                        // compute b_jk
    +                        bWritable.set((float) (jVal * kVal));
    +
    +                        if (symmetricOutput) {
    +                            // (j, k); similarity matrix is symmetric
    +                            jWritable.set(j);
    +                            kWritable.set(k);
    +                            forward(forwardObjs);
    +
    +                            // (k, j)
    +                            jWritable.set(k);
    +                            kWritable.set(j);
    +                            forward(forwardObjs);
    +                        } else {
    +                            if (j < k) {
    +                                jWritable.set(j);
    +                                kWritable.set(k);
    +                            } else {
    +                                jWritable.set(k);
    +                                kWritable.set(j);
    +                            }
    +                            forward(forwardObjs);
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    private void forwardAsStringFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            String j = row[i].getFeature();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new StringFeature(j, scaled);
    +        }
    +
    +        final Text jWritable = new Text();
    +        final Text kWritable = new Text();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            String j = rowScaled[ij].getFeature();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    --- End diff --
    
    NPE in `colProbs.get(j)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [WIP][HIVEMALL-19] Support DIMSUM for a...

Posted by takuti <gi...@git.apache.org>.
Github user takuti commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r119979651
  
    --- Diff: core/src/main/java/hivemall/tools/math/L2NormUDAF.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.tools.math;
    +
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDAF;
    +import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.io.DoubleWritable;
    +
    +@SuppressWarnings("deprecation")
    +@Description(name = "l2_norm",
    +        value = "_FUNC_(double xi) - Return a Root Mean Squared Error")
    --- End diff --
    
    Wrong description


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [WIP][HIVEMALL-19] Support DIMSUM for approx. ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    
    [![Coverage Status](https://coveralls.io/builds/11810768/badge)](https://coveralls.io/builds/11810768)
    
    Coverage increased (+0.4%) to 39.057% when pulling **b42b65b1cb358a89cd402f90b5ec3d6c79ff465c on takuti:DIMSUM** into **10e7d450fa8257efc5d614957fda514b2b91fdee on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    
    [![Coverage Status](https://coveralls.io/builds/11842898/badge)](https://coveralls.io/builds/11842898)
    
    Coverage increased (+0.4%) to 39.114% when pulling **c4c4e8c67b93a572f4ee00fd80b5265079f803a9 on takuti:DIMSUM** into **10e7d450fa8257efc5d614957fda514b2b91fdee on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by myui <gi...@git.apache.org>.
Github user myui commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    LGTM. merged! 
    
    It's better to have a tutorial that uses Movielens dataset for dimsum.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by myui <gi...@git.apache.org>.
Github user myui commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r120307983
  
    --- Diff: core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.knn.similarity;
    +
    +import hivemall.UDTFWithOptions;
    +import hivemall.fm.Feature;
    +import hivemall.fm.IntFeature;
    +import hivemall.fm.StringFeature;
    +import hivemall.math.random.PRNG;
    +import hivemall.math.random.RandomNumberGeneratorFactory;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Primitives;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.Text;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Description(
    +        name = "dimsum_mapper",
    +        value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) "
    +                + "- Returns column-wise partial similarities")
    +public class DIMSUMMapperUDTF extends UDTFWithOptions {
    +    private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class);
    +
    +    protected ListObjectInspector rowOI;
    +    protected MapObjectInspector colNormsOI;
    +
    +    @Nullable
    +    protected Feature[] probes;
    +
    +    @Nonnull
    +    protected PRNG rnd;
    +
    +    protected double threshold;
    +    protected double sqrtGamma;
    +    protected boolean symmetricOutput;
    +    protected boolean parseFeatureAsInt;
    +
    +    protected Map<Object, Double> colNorms;
    +    protected Map<Object, Double> colProbs;
    +
    +    @Override
    +    protected Options getOptions() {
    +        Options opts = new Options();
    +        opts.addOption("th", "threshold", true,
    +            "Theoretically, similarities above this threshold are estimated [default: 0.5]");
    +        opts.addOption("g", "gamma", true,
    +            "Oversampling parameter; if `gamma` is given, `threshold` will be ignored"
    +                + " [default: 10 * log(numCols) / threshold]");
    +        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
    +            "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted");
    +        opts.addOption("int_feature", "feature_as_integer", false,
    +            "Parse a feature (i.e. column ID) as integer");
    +        return opts;
    +    }
    +
    +    @Override
    +    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +            throws UDFArgumentException {
    +        double threshold = 0.5d;
    +        double gamma = Double.POSITIVE_INFINITY;
    +        boolean symmetricOutput = true;
    +        boolean parseFeatureAsInt = false;
    +
    +        CommandLine cl = null;
    +        if (argOIs.length >= 3) {
    +            String rawArgs = HiveUtils.getConstString(argOIs[2]);
    +            cl = parseOptions(rawArgs);
    +            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold);
    +            if (threshold < 0.f || threshold >= 1.f) {
    +                throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold);
    +            }
    +            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
    +            if (gamma <= 1.d) {
    +                throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma);
    +            }
    +            symmetricOutput = !cl.hasOption("disable_symmetric_output");
    +            parseFeatureAsInt = cl.hasOption("feature_as_integer");
    +        }
    +
    +        this.threshold = threshold;
    +        this.sqrtGamma = Math.sqrt(gamma);
    +        this.symmetricOutput = symmetricOutput;
    +        this.parseFeatureAsInt = parseFeatureAsInt;
    +
    +        return cl;
    +    }
    +
    +    @Override
    +    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    +        if (argOIs.length != 2 && argOIs.length != 3) {
    +            throw new UDFArgumentException(
    +                getClass().getSimpleName()
    +                        + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms "
    +                        + "[, CONSTANT STRING options]: "
    +                        + Arrays.toString(argOIs));
    +        }
    +
    +        this.rowOI = HiveUtils.asListOI(argOIs[0]);
    +        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
    +
    +        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
    +
    +        processOptions(argOIs);
    +
    +        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
    +        this.colNorms = null;
    +        this.colProbs = null;
    +
    +        ArrayList<String> fieldNames = new ArrayList<String>();
    +        fieldNames.add("j");
    +        fieldNames.add("k");
    +        fieldNames.add("b_jk");
    +
    +        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    +        if (parseFeatureAsInt) {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +        } else {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +        }
    +        fieldOIs.add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
    +
    +        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    +    }
    +
    +    @Override
    +    public void process(Object[] args) throws HiveException {
    +        Feature[] row = parseFeatures(args[0]);
    +        if (row == null) {
    +            return;
    +        }
    +        this.probes = row;
    +
    +        // since the 2nd argument (column norms) is consistent,
    +        // column-related values, `colNorms` and `colProbs`, should be cached
    +        if (colNorms == null || colProbs == null) {
    +            final int numCols = colNormsOI.getMapSize(args[1]);
    +
    +            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold`
    +                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs
    +                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold);
    +                }
    +            }
    +
    +            this.colNorms = new HashMap<Object, Double>(numCols);
    +            this.colProbs = new HashMap<Object, Double>(numCols);
    +            final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]);
    +            for (Map.Entry<Object, Object> e : m.entrySet()) {
    +                Object j = e.getKey();
    +                if (parseFeatureAsInt) {
    +                    j = HiveUtils.asJavaInt(j);
    +                } else {
    +                    j = j.toString();
    +                }
    +
    +                double norm = HiveUtils.asJavaDouble(e.getValue());
    +                if (norm == 0.d) { // avoid zero-division
    +                    norm = 1.d;
    +                }
    +
    +                colNorms.put(j, norm);
    +
    +                double p = Math.min(1.d, sqrtGamma / norm);
    +                colProbs.put(j, p);
    +            }
    +        }
    +
    +        if (parseFeatureAsInt) {
    +            forwardAsIntFeature(row);
    +        } else {
    +            forwardAsStringFeature(row);
    +        }
    +    }
    +
    +    private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            int j = row[i].getFeatureIndex();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new IntFeature(j, scaled);
    +        }
    +
    +        final IntWritable jWritable = new IntWritable();
    +        final IntWritable kWritable = new IntWritable();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            int j = rowScaled[ij].getFeatureIndex();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    +                for (int ik = ij + 1; ik < length; ik++) {
    +                    int k = rowScaled[ik].getFeatureIndex();
    +                    double kVal = rowScaled[ik].getValue();
    +
    +                    if (kVal != 0.d && rnd.nextDouble() < colProbs.get(k)) {
    +                        // compute b_jk
    +                        bWritable.set((float) (jVal * kVal));
    +
    +                        if (symmetricOutput) {
    +                            // (j, k); similarity matrix is symmetric
    +                            jWritable.set(j);
    +                            kWritable.set(k);
    +                            forward(forwardObjs);
    +
    +                            // (k, j)
    +                            jWritable.set(k);
    +                            kWritable.set(j);
    +                            forward(forwardObjs);
    +                        } else {
    +                            if (j < k) {
    +                                jWritable.set(j);
    +                                kWritable.set(k);
    +                            } else {
    +                                jWritable.set(k);
    +                                kWritable.set(j);
    +                            }
    +                            forward(forwardObjs);
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    private void forwardAsStringFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            String j = row[i].getFeature();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new StringFeature(j, scaled);
    +        }
    +
    +        final Text jWritable = new Text();
    +        final Text kWritable = new Text();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            String j = rowScaled[ij].getFeature();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    +                for (int ik = ij + 1; ik < length; ik++) {
    +                    String k = rowScaled[ik].getFeature();
    +                    double kVal = rowScaled[ik].getValue();
    +
    +                    if (kVal != 0.d && rnd.nextDouble() < colProbs.get(k)) {
    --- End diff --
    
    NPE in `colProbs.get(k)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by myui <gi...@git.apache.org>.
Github user myui commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r120303813
  
    --- Diff: core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.knn.similarity;
    +
    +import hivemall.UDTFWithOptions;
    +import hivemall.fm.Feature;
    +import hivemall.fm.IntFeature;
    +import hivemall.fm.StringFeature;
    +import hivemall.math.random.PRNG;
    +import hivemall.math.random.RandomNumberGeneratorFactory;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Primitives;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.Text;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Description(
    +        name = "dimsum_mapper",
    +        value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) "
    +                + "- Returns column-wise partial similarities")
    +public class DIMSUMMapperUDTF extends UDTFWithOptions {
    +    private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class);
    +
    +    protected ListObjectInspector rowOI;
    +    protected MapObjectInspector colNormsOI;
    +
    +    @Nullable
    +    protected Feature[] probes;
    +
    +    @Nonnull
    +    protected PRNG rnd;
    +
    +    protected double threshold;
    +    protected double sqrtGamma;
    +    protected boolean symmetricOutput;
    +    protected boolean parseFeatureAsInt;
    +
    +    protected Map<Object, Double> colNorms;
    +    protected Map<Object, Double> colProbs;
    +
    +    @Override
    +    protected Options getOptions() {
    +        Options opts = new Options();
    +        opts.addOption("th", "threshold", true,
    +            "Theoretically, similarities above this threshold are estimated [default: 0.5]");
    +        opts.addOption("g", "gamma", true,
    +            "Oversampling parameter; if `gamma` is given, `threshold` will be ignored"
    +                + " [default: 10 * log(numCols) / threshold]");
    +        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
    +            "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted");
    +        opts.addOption("int_feature", "feature_as_integer", false,
    +            "Parse a feature (i.e. column ID) as integer");
    +        return opts;
    +    }
    +
    +    @Override
    +    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +            throws UDFArgumentException {
    +        double threshold = 0.5d;
    +        double gamma = Double.POSITIVE_INFINITY;
    +        boolean symmetricOutput = true;
    +        boolean parseFeatureAsInt = false;
    +
    +        CommandLine cl = null;
    +        if (argOIs.length >= 3) {
    +            String rawArgs = HiveUtils.getConstString(argOIs[2]);
    +            cl = parseOptions(rawArgs);
    +            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold);
    +            if (threshold < 0.f || threshold >= 1.f) {
    +                throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold);
    +            }
    +            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
    +            if (gamma <= 1.d) {
    +                throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma);
    +            }
    +            symmetricOutput = !cl.hasOption("disable_symmetric_output");
    +            parseFeatureAsInt = cl.hasOption("feature_as_integer");
    +        }
    +
    +        this.threshold = threshold;
    +        this.sqrtGamma = Math.sqrt(gamma);
    +        this.symmetricOutput = symmetricOutput;
    +        this.parseFeatureAsInt = parseFeatureAsInt;
    +
    +        return cl;
    +    }
    +
    +    @Override
    +    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    +        if (argOIs.length != 2 && argOIs.length != 3) {
    +            throw new UDFArgumentException(
    +                getClass().getSimpleName()
    +                        + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms "
    +                        + "[, CONSTANT STRING options]: "
    +                        + Arrays.toString(argOIs));
    +        }
    +
    +        this.rowOI = HiveUtils.asListOI(argOIs[0]);
    +        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
    +
    +        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
    +
    +        processOptions(argOIs);
    +
    +        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
    +        this.colNorms = null;
    +        this.colProbs = null;
    +
    +        ArrayList<String> fieldNames = new ArrayList<String>();
    +        fieldNames.add("j");
    +        fieldNames.add("k");
    +        fieldNames.add("b_jk");
    +
    +        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    +        if (parseFeatureAsInt) {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +        } else {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +        }
    +        fieldOIs.add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
    +
    +        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    +    }
    +
    +    @Override
    +    public void process(Object[] args) throws HiveException {
    +        Feature[] row = parseFeatures(args[0]);
    +        if (row == null) {
    +            return;
    +        }
    +        this.probes = row;
    +
    +        // since the 2nd argument (column norms) is consistent,
    +        // column-related values, `colNorms` and `colProbs`, should be cached
    +        if (colNorms == null || colProbs == null) {
    +            final int numCols = colNormsOI.getMapSize(args[1]);
    +
    +            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold`
    +                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs
    +                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold);
    +                }
    +            }
    +
    +            this.colNorms = new HashMap<Object, Double>(numCols);
    +            this.colProbs = new HashMap<Object, Double>(numCols);
    +            final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]);
    +            for (Map.Entry<Object, Object> e : m.entrySet()) {
    +                Object j = e.getKey();
    +                if (parseFeatureAsInt) {
    +                    j = HiveUtils.asJavaInt(j);
    +                } else {
    +                    j = j.toString();
    +                }
    +
    +                double norm = HiveUtils.asJavaDouble(e.getValue());
    +                if (norm == 0.d) { // avoid zero-division
    +                    norm = 1.d;
    +                }
    +
    +                colNorms.put(j, norm);
    +
    +                double p = Math.min(1.d, sqrtGamma / norm);
    +                colProbs.put(j, p);
    +            }
    +        }
    +
    +        if (parseFeatureAsInt) {
    +            forwardAsIntFeature(row);
    +        } else {
    +            forwardAsStringFeature(row);
    +        }
    +    }
    +
    +    private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            int j = row[i].getFeatureIndex();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new IntFeature(j, scaled);
    +        }
    +
    +        final IntWritable jWritable = new IntWritable();
    +        final IntWritable kWritable = new IntWritable();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            int j = rowScaled[ij].getFeatureIndex();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    --- End diff --
    
    NPE in `colProbs.get(j)` could be happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by myui <gi...@git.apache.org>.
Github user myui commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r120303659
  
    --- Diff: core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.knn.similarity;
    +
    +import hivemall.UDTFWithOptions;
    +import hivemall.fm.Feature;
    +import hivemall.fm.IntFeature;
    +import hivemall.fm.StringFeature;
    +import hivemall.math.random.PRNG;
    +import hivemall.math.random.RandomNumberGeneratorFactory;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Primitives;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.Text;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Description(
    +        name = "dimsum_mapper",
    +        value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) "
    +                + "- Returns column-wise partial similarities")
    +public class DIMSUMMapperUDTF extends UDTFWithOptions {
    +    private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class);
    +
    +    protected ListObjectInspector rowOI;
    +    protected MapObjectInspector colNormsOI;
    +
    +    @Nullable
    +    protected Feature[] probes;
    +
    +    @Nonnull
    +    protected PRNG rnd;
    +
    +    protected double threshold;
    +    protected double sqrtGamma;
    +    protected boolean symmetricOutput;
    +    protected boolean parseFeatureAsInt;
    +
    +    protected Map<Object, Double> colNorms;
    +    protected Map<Object, Double> colProbs;
    +
    +    @Override
    +    protected Options getOptions() {
    +        Options opts = new Options();
    +        opts.addOption("th", "threshold", true,
    +            "Theoretically, similarities above this threshold are estimated [default: 0.5]");
    +        opts.addOption("g", "gamma", true,
    +            "Oversampling parameter; if `gamma` is given, `threshold` will be ignored"
    +                + " [default: 10 * log(numCols) / threshold]");
    +        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
    +            "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted");
    +        opts.addOption("int_feature", "feature_as_integer", false,
    +            "Parse a feature (i.e. column ID) as integer");
    +        return opts;
    +    }
    +
    +    @Override
    +    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +            throws UDFArgumentException {
    +        double threshold = 0.5d;
    +        double gamma = Double.POSITIVE_INFINITY;
    +        boolean symmetricOutput = true;
    +        boolean parseFeatureAsInt = false;
    +
    +        CommandLine cl = null;
    +        if (argOIs.length >= 3) {
    +            String rawArgs = HiveUtils.getConstString(argOIs[2]);
    +            cl = parseOptions(rawArgs);
    +            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold);
    +            if (threshold < 0.f || threshold >= 1.f) {
    +                throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold);
    +            }
    +            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
    +            if (gamma <= 1.d) {
    +                throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma);
    +            }
    +            symmetricOutput = !cl.hasOption("disable_symmetric_output");
    +            parseFeatureAsInt = cl.hasOption("feature_as_integer");
    +        }
    +
    +        this.threshold = threshold;
    +        this.sqrtGamma = Math.sqrt(gamma);
    +        this.symmetricOutput = symmetricOutput;
    +        this.parseFeatureAsInt = parseFeatureAsInt;
    +
    +        return cl;
    +    }
    +
    +    @Override
    +    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    +        if (argOIs.length != 2 && argOIs.length != 3) {
    +            throw new UDFArgumentException(
    +                getClass().getSimpleName()
    +                        + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms "
    +                        + "[, CONSTANT STRING options]: "
    +                        + Arrays.toString(argOIs));
    +        }
    +
    +        this.rowOI = HiveUtils.asListOI(argOIs[0]);
    +        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
    +
    +        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
    +
    +        processOptions(argOIs);
    +
    +        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
    +        this.colNorms = null;
    +        this.colProbs = null;
    +
    +        ArrayList<String> fieldNames = new ArrayList<String>();
    +        fieldNames.add("j");
    +        fieldNames.add("k");
    +        fieldNames.add("b_jk");
    +
    +        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    +        if (parseFeatureAsInt) {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +        } else {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +        }
    +        fieldOIs.add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
    +
    +        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    +    }
    +
    +    @Override
    +    public void process(Object[] args) throws HiveException {
    +        Feature[] row = parseFeatures(args[0]);
    +        if (row == null) {
    +            return;
    +        }
    +        this.probes = row;
    +
    +        // since the 2nd argument (column norms) is consistent,
    +        // column-related values, `colNorms` and `colProbs`, should be cached
    +        if (colNorms == null || colProbs == null) {
    +            final int numCols = colNormsOI.getMapSize(args[1]);
    +
    +            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold`
    +                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs
    +                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold);
    +                }
    +            }
    +
    +            this.colNorms = new HashMap<Object, Double>(numCols);
    +            this.colProbs = new HashMap<Object, Double>(numCols);
    +            final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]);
    +            for (Map.Entry<Object, Object> e : m.entrySet()) {
    +                Object j = e.getKey();
    +                if (parseFeatureAsInt) {
    +                    j = HiveUtils.asJavaInt(j);
    +                } else {
    +                    j = j.toString();
    +                }
    +
    +                double norm = HiveUtils.asJavaDouble(e.getValue());
    +                if (norm == 0.d) { // avoid zero-division
    +                    norm = 1.d;
    +                }
    +
    +                colNorms.put(j, norm);
    +
    +                double p = Math.min(1.d, sqrtGamma / norm);
    +                colProbs.put(j, p);
    +            }
    +        }
    +
    +        if (parseFeatureAsInt) {
    +            forwardAsIntFeature(row);
    +        } else {
    +            forwardAsStringFeature(row);
    +        }
    +    }
    +
    +    private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            int j = row[i].getFeatureIndex();
    +
    +            double norm = colNorms.get(j).doubleValue();
    --- End diff --
    
    NPE could be happen in `colNorms.get(j)`
    
    `PrimitiveUtils.doubleValue(colNorms.get(j), 0.d);` to be null safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by takuti <gi...@git.apache.org>.
Github user takuti commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r120520063
  
    --- Diff: core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.knn.similarity;
    +
    +import hivemall.UDTFWithOptions;
    +import hivemall.fm.Feature;
    +import hivemall.fm.IntFeature;
    +import hivemall.fm.StringFeature;
    +import hivemall.math.random.PRNG;
    +import hivemall.math.random.RandomNumberGeneratorFactory;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Primitives;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.Text;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Description(
    +        name = "dimsum_mapper",
    +        value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) "
    +                + "- Returns column-wise partial similarities")
    +public class DIMSUMMapperUDTF extends UDTFWithOptions {
    +    private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class);
    +
    +    protected ListObjectInspector rowOI;
    +    protected MapObjectInspector colNormsOI;
    +
    +    @Nullable
    +    protected Feature[] probes;
    +
    +    @Nonnull
    +    protected PRNG rnd;
    +
    +    protected double threshold;
    +    protected double sqrtGamma;
    +    protected boolean symmetricOutput;
    +    protected boolean parseFeatureAsInt;
    +
    +    protected Map<Object, Double> colNorms;
    +    protected Map<Object, Double> colProbs;
    +
    +    @Override
    +    protected Options getOptions() {
    +        Options opts = new Options();
    +        opts.addOption("th", "threshold", true,
    +            "Theoretically, similarities above this threshold are estimated [default: 0.5]");
    +        opts.addOption("g", "gamma", true,
    +            "Oversampling parameter; if `gamma` is given, `threshold` will be ignored"
    +                + " [default: 10 * log(numCols) / threshold]");
    +        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
    +            "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted");
    +        opts.addOption("int_feature", "feature_as_integer", false,
    +            "Parse a feature (i.e. column ID) as integer");
    +        return opts;
    +    }
    +
    +    @Override
    +    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +            throws UDFArgumentException {
    +        double threshold = 0.5d;
    +        double gamma = Double.POSITIVE_INFINITY;
    +        boolean symmetricOutput = true;
    +        boolean parseFeatureAsInt = false;
    +
    +        CommandLine cl = null;
    +        if (argOIs.length >= 3) {
    +            String rawArgs = HiveUtils.getConstString(argOIs[2]);
    +            cl = parseOptions(rawArgs);
    +            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold);
    +            if (threshold < 0.f || threshold >= 1.f) {
    +                throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold);
    +            }
    +            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
    +            if (gamma <= 1.d) {
    +                throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma);
    +            }
    +            symmetricOutput = !cl.hasOption("disable_symmetric_output");
    +            parseFeatureAsInt = cl.hasOption("feature_as_integer");
    +        }
    +
    +        this.threshold = threshold;
    +        this.sqrtGamma = Math.sqrt(gamma);
    +        this.symmetricOutput = symmetricOutput;
    +        this.parseFeatureAsInt = parseFeatureAsInt;
    +
    +        return cl;
    +    }
    +
    +    @Override
    +    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    +        if (argOIs.length != 2 && argOIs.length != 3) {
    +            throw new UDFArgumentException(
    +                getClass().getSimpleName()
    +                        + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms "
    +                        + "[, CONSTANT STRING options]: "
    +                        + Arrays.toString(argOIs));
    +        }
    +
    +        this.rowOI = HiveUtils.asListOI(argOIs[0]);
    +        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
    +
    +        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
    +
    +        processOptions(argOIs);
    +
    +        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
    +        this.colNorms = null;
    +        this.colProbs = null;
    +
    +        ArrayList<String> fieldNames = new ArrayList<String>();
    +        fieldNames.add("j");
    +        fieldNames.add("k");
    +        fieldNames.add("b_jk");
    +
    +        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    +        if (parseFeatureAsInt) {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +        } else {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +        }
    +        fieldOIs.add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
    +
    +        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    +    }
    +
    +    @Override
    +    public void process(Object[] args) throws HiveException {
    +        Feature[] row = parseFeatures(args[0]);
    +        if (row == null) {
    +            return;
    +        }
    +        this.probes = row;
    +
    +        // since the 2nd argument (column norms) is consistent,
    +        // column-related values, `colNorms` and `colProbs`, should be cached
    +        if (colNorms == null || colProbs == null) {
    +            final int numCols = colNormsOI.getMapSize(args[1]);
    +
    +            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold`
    +                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs
    +                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold);
    +                }
    +            }
    +
    +            this.colNorms = new HashMap<Object, Double>(numCols);
    +            this.colProbs = new HashMap<Object, Double>(numCols);
    +            final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]);
    +            for (Map.Entry<Object, Object> e : m.entrySet()) {
    +                Object j = e.getKey();
    +                if (parseFeatureAsInt) {
    +                    j = HiveUtils.asJavaInt(j);
    +                } else {
    +                    j = j.toString();
    +                }
    +
    +                double norm = HiveUtils.asJavaDouble(e.getValue());
    +                if (norm == 0.d) { // avoid zero-division
    +                    norm = 1.d;
    +                }
    +
    +                colNorms.put(j, norm);
    +
    +                double p = Math.min(1.d, sqrtGamma / norm);
    +                colProbs.put(j, p);
    +            }
    +        }
    +
    +        if (parseFeatureAsInt) {
    +            forwardAsIntFeature(row);
    +        } else {
    +            forwardAsStringFeature(row);
    +        }
    +    }
    +
    +    private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            int j = row[i].getFeatureIndex();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new IntFeature(j, scaled);
    +        }
    +
    +        final IntWritable jWritable = new IntWritable();
    +        final IntWritable kWritable = new IntWritable();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            int j = rowScaled[ij].getFeatureIndex();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    +                for (int ik = ij + 1; ik < length; ik++) {
    +                    int k = rowScaled[ik].getFeatureIndex();
    +                    double kVal = rowScaled[ik].getValue();
    +
    +                    if (kVal != 0.d && rnd.nextDouble() < colProbs.get(k)) {
    +                        // compute b_jk
    +                        bWritable.set((float) (jVal * kVal));
    --- End diff --
    
    Yeah, you are right. Even though `jVal` and `kVal` are "scaled" by `Math.min(sqrtGamma, norm)`, they could be very large in case that `gamma` is very small. Let's us double.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    
    [![Coverage Status](https://coveralls.io/builds/11842895/badge)](https://coveralls.io/builds/11842895)
    
    Coverage increased (+0.4%) to 39.114% when pulling **c4c4e8c67b93a572f4ee00fd80b5265079f803a9 on takuti:DIMSUM** into **10e7d450fa8257efc5d614957fda514b2b91fdee on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    
    [![Coverage Status](https://coveralls.io/builds/11860834/badge)](https://coveralls.io/builds/11860834)
    
    Coverage increased (+0.4%) to 39.519% when pulling **02b1290702cd94487c53ac8a14c5d5a985f8f17f on takuti:DIMSUM** into **1dac1a62fbf35848deca8f0a8aa96f01dc1c0f2d on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by takuti <gi...@git.apache.org>.
Github user takuti commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    @myui Thanks. Created a new issue: https://issues.apache.org/jira/browse/HIVEMALL-113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by myui <gi...@git.apache.org>.
Github user myui commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r120307685
  
    --- Diff: core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.knn.similarity;
    +
    +import hivemall.UDTFWithOptions;
    +import hivemall.fm.Feature;
    +import hivemall.fm.IntFeature;
    +import hivemall.fm.StringFeature;
    +import hivemall.math.random.PRNG;
    +import hivemall.math.random.RandomNumberGeneratorFactory;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Primitives;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.Text;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Description(
    +        name = "dimsum_mapper",
    +        value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) "
    +                + "- Returns column-wise partial similarities")
    +public class DIMSUMMapperUDTF extends UDTFWithOptions {
    +    private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class);
    +
    +    protected ListObjectInspector rowOI;
    +    protected MapObjectInspector colNormsOI;
    +
    +    @Nullable
    +    protected Feature[] probes;
    +
    +    @Nonnull
    +    protected PRNG rnd;
    +
    +    protected double threshold;
    +    protected double sqrtGamma;
    +    protected boolean symmetricOutput;
    +    protected boolean parseFeatureAsInt;
    +
    +    protected Map<Object, Double> colNorms;
    +    protected Map<Object, Double> colProbs;
    +
    +    @Override
    +    protected Options getOptions() {
    +        Options opts = new Options();
    +        opts.addOption("th", "threshold", true,
    +            "Theoretically, similarities above this threshold are estimated [default: 0.5]");
    +        opts.addOption("g", "gamma", true,
    +            "Oversampling parameter; if `gamma` is given, `threshold` will be ignored"
    +                + " [default: 10 * log(numCols) / threshold]");
    +        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
    +            "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted");
    +        opts.addOption("int_feature", "feature_as_integer", false,
    +            "Parse a feature (i.e. column ID) as integer");
    +        return opts;
    +    }
    +
    +    @Override
    +    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +            throws UDFArgumentException {
    +        double threshold = 0.5d;
    +        double gamma = Double.POSITIVE_INFINITY;
    +        boolean symmetricOutput = true;
    +        boolean parseFeatureAsInt = false;
    +
    +        CommandLine cl = null;
    +        if (argOIs.length >= 3) {
    +            String rawArgs = HiveUtils.getConstString(argOIs[2]);
    +            cl = parseOptions(rawArgs);
    +            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold);
    +            if (threshold < 0.f || threshold >= 1.f) {
    +                throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold);
    +            }
    +            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
    +            if (gamma <= 1.d) {
    +                throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma);
    +            }
    +            symmetricOutput = !cl.hasOption("disable_symmetric_output");
    +            parseFeatureAsInt = cl.hasOption("feature_as_integer");
    +        }
    +
    +        this.threshold = threshold;
    +        this.sqrtGamma = Math.sqrt(gamma);
    +        this.symmetricOutput = symmetricOutput;
    +        this.parseFeatureAsInt = parseFeatureAsInt;
    +
    +        return cl;
    +    }
    +
    +    @Override
    +    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    +        if (argOIs.length != 2 && argOIs.length != 3) {
    +            throw new UDFArgumentException(
    +                getClass().getSimpleName()
    +                        + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms "
    +                        + "[, CONSTANT STRING options]: "
    +                        + Arrays.toString(argOIs));
    +        }
    +
    +        this.rowOI = HiveUtils.asListOI(argOIs[0]);
    +        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
    +
    +        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
    +
    +        processOptions(argOIs);
    +
    +        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
    +        this.colNorms = null;
    +        this.colProbs = null;
    +
    +        ArrayList<String> fieldNames = new ArrayList<String>();
    +        fieldNames.add("j");
    +        fieldNames.add("k");
    +        fieldNames.add("b_jk");
    +
    +        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    +        if (parseFeatureAsInt) {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +        } else {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +        }
    +        fieldOIs.add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
    +
    +        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    +    }
    +
    +    @Override
    +    public void process(Object[] args) throws HiveException {
    +        Feature[] row = parseFeatures(args[0]);
    +        if (row == null) {
    +            return;
    +        }
    +        this.probes = row;
    +
    +        // since the 2nd argument (column norms) is consistent,
    +        // column-related values, `colNorms` and `colProbs`, should be cached
    +        if (colNorms == null || colProbs == null) {
    +            final int numCols = colNormsOI.getMapSize(args[1]);
    +
    +            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold`
    +                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs
    +                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold);
    +                }
    +            }
    +
    +            this.colNorms = new HashMap<Object, Double>(numCols);
    +            this.colProbs = new HashMap<Object, Double>(numCols);
    +            final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]);
    +            for (Map.Entry<Object, Object> e : m.entrySet()) {
    +                Object j = e.getKey();
    +                if (parseFeatureAsInt) {
    +                    j = HiveUtils.asJavaInt(j);
    +                } else {
    +                    j = j.toString();
    +                }
    +
    +                double norm = HiveUtils.asJavaDouble(e.getValue());
    +                if (norm == 0.d) { // avoid zero-division
    +                    norm = 1.d;
    +                }
    +
    +                colNorms.put(j, norm);
    +
    +                double p = Math.min(1.d, sqrtGamma / norm);
    +                colProbs.put(j, p);
    +            }
    +        }
    +
    +        if (parseFeatureAsInt) {
    +            forwardAsIntFeature(row);
    +        } else {
    +            forwardAsStringFeature(row);
    +        }
    +    }
    +
    +    private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            int j = row[i].getFeatureIndex();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new IntFeature(j, scaled);
    +        }
    +
    +        final IntWritable jWritable = new IntWritable();
    +        final IntWritable kWritable = new IntWritable();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            int j = rowScaled[ij].getFeatureIndex();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    +                for (int ik = ij + 1; ik < length; ik++) {
    +                    int k = rowScaled[ik].getFeatureIndex();
    +                    double kVal = rowScaled[ik].getValue();
    +
    +                    if (kVal != 0.d && rnd.nextDouble() < colProbs.get(k)) {
    +                        // compute b_jk
    +                        bWritable.set((float) (jVal * kVal));
    --- End diff --
    
    I have overflow concerns here. Is float is enough (or, should be double) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall issue #84: [HIVEMALL-19] Support DIMSUM for approx all-pa...

Posted by takuti <gi...@git.apache.org>.
Github user takuti commented on the issue:

    https://github.com/apache/incubator-hivemall/pull/84
  
    @myui I've created a new utility method `doubleValue(Double v, double defaultValue` in `hivemall.utils.lang.Primitives`. If here is not an appropriate class, plz let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hivemall pull request #84: [HIVEMALL-19] Support DIMSUM for approx...

Posted by myui <gi...@git.apache.org>.
Github user myui commented on a diff in the pull request:

    https://github.com/apache/incubator-hivemall/pull/84#discussion_r120304078
  
    --- Diff: core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package hivemall.knn.similarity;
    +
    +import hivemall.UDTFWithOptions;
    +import hivemall.fm.Feature;
    +import hivemall.fm.IntFeature;
    +import hivemall.fm.StringFeature;
    +import hivemall.math.random.PRNG;
    +import hivemall.math.random.RandomNumberGeneratorFactory;
    +import hivemall.utils.hadoop.HiveUtils;
    +import hivemall.utils.lang.Primitives;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hive.ql.exec.Description;
    +import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    +import org.apache.hadoop.hive.ql.metadata.HiveException;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    +import org.apache.hadoop.io.FloatWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.Text;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +@Description(
    +        name = "dimsum_mapper",
    +        value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) "
    +                + "- Returns column-wise partial similarities")
    +public class DIMSUMMapperUDTF extends UDTFWithOptions {
    +    private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class);
    +
    +    protected ListObjectInspector rowOI;
    +    protected MapObjectInspector colNormsOI;
    +
    +    @Nullable
    +    protected Feature[] probes;
    +
    +    @Nonnull
    +    protected PRNG rnd;
    +
    +    protected double threshold;
    +    protected double sqrtGamma;
    +    protected boolean symmetricOutput;
    +    protected boolean parseFeatureAsInt;
    +
    +    protected Map<Object, Double> colNorms;
    +    protected Map<Object, Double> colProbs;
    +
    +    @Override
    +    protected Options getOptions() {
    +        Options opts = new Options();
    +        opts.addOption("th", "threshold", true,
    +            "Theoretically, similarities above this threshold are estimated [default: 0.5]");
    +        opts.addOption("g", "gamma", true,
    +            "Oversampling parameter; if `gamma` is given, `threshold` will be ignored"
    +                + " [default: 10 * log(numCols) / threshold]");
    +        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
    +            "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted");
    +        opts.addOption("int_feature", "feature_as_integer", false,
    +            "Parse a feature (i.e. column ID) as integer");
    +        return opts;
    +    }
    +
    +    @Override
    +    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
    +            throws UDFArgumentException {
    +        double threshold = 0.5d;
    +        double gamma = Double.POSITIVE_INFINITY;
    +        boolean symmetricOutput = true;
    +        boolean parseFeatureAsInt = false;
    +
    +        CommandLine cl = null;
    +        if (argOIs.length >= 3) {
    +            String rawArgs = HiveUtils.getConstString(argOIs[2]);
    +            cl = parseOptions(rawArgs);
    +            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold);
    +            if (threshold < 0.f || threshold >= 1.f) {
    +                throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold);
    +            }
    +            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
    +            if (gamma <= 1.d) {
    +                throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma);
    +            }
    +            symmetricOutput = !cl.hasOption("disable_symmetric_output");
    +            parseFeatureAsInt = cl.hasOption("feature_as_integer");
    +        }
    +
    +        this.threshold = threshold;
    +        this.sqrtGamma = Math.sqrt(gamma);
    +        this.symmetricOutput = symmetricOutput;
    +        this.parseFeatureAsInt = parseFeatureAsInt;
    +
    +        return cl;
    +    }
    +
    +    @Override
    +    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    +        if (argOIs.length != 2 && argOIs.length != 3) {
    +            throw new UDFArgumentException(
    +                getClass().getSimpleName()
    +                        + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms "
    +                        + "[, CONSTANT STRING options]: "
    +                        + Arrays.toString(argOIs));
    +        }
    +
    +        this.rowOI = HiveUtils.asListOI(argOIs[0]);
    +        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
    +
    +        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
    +
    +        processOptions(argOIs);
    +
    +        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
    +        this.colNorms = null;
    +        this.colProbs = null;
    +
    +        ArrayList<String> fieldNames = new ArrayList<String>();
    +        fieldNames.add("j");
    +        fieldNames.add("k");
    +        fieldNames.add("b_jk");
    +
    +        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    +        if (parseFeatureAsInt) {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
    +        } else {
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +            fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    +        }
    +        fieldOIs.add(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
    +
    +        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    +    }
    +
    +    @Override
    +    public void process(Object[] args) throws HiveException {
    +        Feature[] row = parseFeatures(args[0]);
    +        if (row == null) {
    +            return;
    +        }
    +        this.probes = row;
    +
    +        // since the 2nd argument (column norms) is consistent,
    +        // column-related values, `colNorms` and `colProbs`, should be cached
    +        if (colNorms == null || colProbs == null) {
    +            final int numCols = colNormsOI.getMapSize(args[1]);
    +
    +            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold`
    +                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs
    +                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold);
    +                }
    +            }
    +
    +            this.colNorms = new HashMap<Object, Double>(numCols);
    +            this.colProbs = new HashMap<Object, Double>(numCols);
    +            final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]);
    +            for (Map.Entry<Object, Object> e : m.entrySet()) {
    +                Object j = e.getKey();
    +                if (parseFeatureAsInt) {
    +                    j = HiveUtils.asJavaInt(j);
    +                } else {
    +                    j = j.toString();
    +                }
    +
    +                double norm = HiveUtils.asJavaDouble(e.getValue());
    +                if (norm == 0.d) { // avoid zero-division
    +                    norm = 1.d;
    +                }
    +
    +                colNorms.put(j, norm);
    +
    +                double p = Math.min(1.d, sqrtGamma / norm);
    +                colProbs.put(j, p);
    +            }
    +        }
    +
    +        if (parseFeatureAsInt) {
    +            forwardAsIntFeature(row);
    +        } else {
    +            forwardAsStringFeature(row);
    +        }
    +    }
    +
    +    private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            int j = row[i].getFeatureIndex();
    +
    +            double norm = colNorms.get(j).doubleValue();
    +            if (norm == 0.d) { // avoid zero-division
    +                norm = 1.d;
    +            }
    +            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
    +
    +            rowScaled[i] = new IntFeature(j, scaled);
    +        }
    +
    +        final IntWritable jWritable = new IntWritable();
    +        final IntWritable kWritable = new IntWritable();
    +        final FloatWritable bWritable = new FloatWritable();
    +
    +        final Object[] forwardObjs = new Object[3];
    +        forwardObjs[0] = jWritable;
    +        forwardObjs[1] = kWritable;
    +        forwardObjs[2] = bWritable;
    +
    +        for (int ij = 0; ij < length; ij++) {
    +            int j = rowScaled[ij].getFeatureIndex();
    +            double jVal = rowScaled[ij].getValue();
    +
    +            if (jVal != 0.d && rnd.nextDouble() < colProbs.get(j)) {
    +                for (int ik = ij + 1; ik < length; ik++) {
    +                    int k = rowScaled[ik].getFeatureIndex();
    +                    double kVal = rowScaled[ik].getValue();
    +
    +                    if (kVal != 0.d && rnd.nextDouble() < colProbs.get(k)) {
    +                        // compute b_jk
    +                        bWritable.set((float) (jVal * kVal));
    +
    +                        if (symmetricOutput) {
    +                            // (j, k); similarity matrix is symmetric
    +                            jWritable.set(j);
    +                            kWritable.set(k);
    +                            forward(forwardObjs);
    +
    +                            // (k, j)
    +                            jWritable.set(k);
    +                            kWritable.set(j);
    +                            forward(forwardObjs);
    +                        } else {
    +                            if (j < k) {
    +                                jWritable.set(j);
    +                                kWritable.set(k);
    +                            } else {
    +                                jWritable.set(k);
    +                                kWritable.set(j);
    +                            }
    +                            forward(forwardObjs);
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    private void forwardAsStringFeature(@Nonnull Feature[] row) throws HiveException {
    +        final int length = row.length;
    +
    +        Feature[] rowScaled = new Feature[length];
    +        for (int i = 0; i < length; i++) {
    +            String j = row[i].getFeature();
    +
    +            double norm = colNorms.get(j).doubleValue();
    --- End diff --
    
    NPE in `colNorms.get(j).doubleValue();`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---