You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@phoenix.apache.org by akhil jain <ak...@gmail.com> on 2016/10/01 16:19:55 UTC

Phoenix custom UDF

I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
datatype is 'VARBINARY'.
The data in these columns is compressed float[] in form of ByteBuffer
called DenseVector which is an ordered set of 16 bit IEEE floats of
cardinality no more than 3996.
I have loaded data into phoenix tables through spark-phoenix plugin. Just
to give an idea the mapreduce jobs write data in hive in parquet gzip
format. I read data into a dataframe using sqlContext.parquetFile() ,
register it as temp table and run a sqlContext.sql("select query ...")
query and finally calling res.save("org.apache.phoenix.spark",
SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
"localhost:2181"))
We have a hive/shark UDF(code pasted below) that can decode these
ByteBuffer columns and display them in readable float[]. So this UDF works
on spark-shell.
Now I just want to write a similar UDF in phoenix and run queries as "
select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
siteflowtable" and further write UDAFs over it.
How do I make phoenix UDF return float[] ?? I have tried a lot many things
but none seem to work.

Below is the code for hive/shark UDF-
------------------------------------------------------------
------------------------------
package com.ABCD.densevectorudf;

import java.nio.ByteBuffer;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
StringObjectInspector;
import org.apache.hadoop.io.FloatWritable;

import com.ABCD.common.attval.IDenseVectorOperator;
import com.ABCD.common.attval.Utility;
import com.ABCD.common.attval.array.BufferOperations;
import com.ABCD.common.attval.array.FloatArrayFactory;

@Description(name = "DenseVectorUDF",
value = "Dense Vector UDF in Hive / Shark\n"
+ "_FUNC_(binary|hex) - "
+ "Returns the dense vector array<float> value\n",
extended = "Dense Vector UDAF in Hive / Shark")

public class DenseVectorUDF extends GenericUDF {
private static final Log LOG = LogFactory.getLog(
DenseVectorUDF.class.getName());
private ObjectInspector inputOI;
private ListObjectInspector outputOI;

@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("densevectorudf(");
for (int i = 0; i < children.length; i++) {
sb.append(children[i]);
if (i + 1 != children.length) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws
UDFArgumentException {
if (arguments.length == 1) {
ObjectInspector first = arguments[0];
if (!(first instanceof StringObjectInspector) && !(first instanceof
BinaryObjectInspector)) {
LOG.error("first argument must be a either binary or hex buffer");
throw new UDFArgumentException("first argument must be a either binary or
hex buffer");
}
inputOI = first;
outputOI = ObjectInspectorFactory.getStandardListObjectInspector(
PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
} else {
throw new UDFArgumentLengthException("Wrong argument length is passed.
Arguments length is NOT supported.");
}
return outputOI;
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
Object object = arguments[0].get();
Vector<Float> floatVector = null;
ByteBuffer buff = null;
if (inputOI instanceof StringObjectInspector) {
String hex = ((StringObjectInspector) inputOI).
getPrimitiveJavaObject(object);
buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
} else if (inputOI instanceof BinaryObjectInspector) {
byte[] bytes = ((BinaryObjectInspector) inputOI).
getPrimitiveJavaObject(object);
buff = ByteBuffer.wrap(bytes);
}
floatVector = idv.getElements(buff);
Object red [] = new Object[floatVector.size()];
for(int index = 0; index < red.length; index++){
red[index] = new FloatWritable(floatVector.get(index));
}
LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
return red;
}
}

------------------------------------------------------------
------------------------------


Following is the code I have written for Phoenix UDF-
------------------------------------------------------------
------------------------------
package org.apache.phoenix.expression.function;

import com.ABCD.common.attval.IDenseVectorOperator;
import com.ABCD.common.attval.array.BufferOperations;
import com.ABCD.common.attval.array.FloatArrayFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.FloatWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectInspector;
import org.apache.phoenix.parse.FunctionParseNode.Argument;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PFloatArray;
import org.apache.phoenix.schema.types.PVarbinary;

import java.nio.ByteBuffer;
import java.nio.FloatBuffer;
import java.sql.SQLException;
import java.util.List;
import java.util.Vector;

@BuiltInFunction(name = DenseVectorFunction.NAME, args = {
        @Argument(allowedTypes = {PVarbinary.class})})
public class DenseVectorFunction extends ScalarFunction {
    public static final String NAME = "DenseVectorFunction";
    private ListObjectInspector outputOI;

    public DenseVectorFunction() {
    }

    public DenseVectorFunction(List<Expression> children) throws
SQLException {
        super(children);
    }

    @Override
    public String getName() {
        return NAME;
    }

    public Expression getElementExpr() {
        return children.get(0);
    }

    @Override
    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        if (!getElementExpr().evaluate(tuple, ptr)) {
            return false;
        }
        Object element = getElementExpr().getDataType().toObject(ptr,
getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
getElementExpr().getScale());
        IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
        PhoenixBinaryObjectInspector pboi = new
PhoenixBinaryObjectInspector();
        byte[] bytes = pboi.getPrimitiveJavaObject(element);
        Object object = ptr.get();
        Vector<Float> floatVector = null;
        ByteBuffer buff = null;
        buff = ByteBuffer.wrap(bytes);
        floatVector = idv.getElements(buff);

        Object[] red = new Object[floatVector.size()];
        for (int index = 0; index < red.length; index++) {
            red[index] = new FloatWritable(floatVector.get(index));
            System.out.println("" + floatVector.get(index));
        }
        System.out.println("Buffer header = " +
BufferOperations.stringifyBuffer(buff));
// This prints header info in ByteBuffer which is correct
//HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
        ptr.set(??);
        return true;
    }

    @Override
    public SortOrder getSortOrder() {
        return children.get(0).getSortOrder();
    }

    @Override
    public PDataType getDataType() {
        return PFloatArray.INSTANCE;
    }
}
------------------------------------------------------------
------------------------------

Any help will be much appreciated.

Re: Phoenix custom UDF

Posted by akhil jain <ak...@gmail.com>.
Hi Rajesh,

Thanks for reply. But we can surely write UDAF's on lines of default SUM
and AVG functions in phoenix, which are present by default.
We need a good tutorial or documentation for this.

Thanks,
AJ

On Mon, Oct 3, 2016 at 2:05 PM, rajeshbabu@apache.org <
chrajeshbabu32@gmail.com> wrote:

> Hi Akhil,
>
> There is no support for UDAFs in Phoenix at present.
>
> Thanks,
> Rajeshbabu.
>
> On Sun, Oct 2, 2016 at 6:57 PM, akhil jain <ak...@gmail.com>
> wrote:
>
>> Thanks James. It worked.
>>
>> Can you please provide me pointers to write UDAFs in phoenix like we
>> have GenericUDAFEvaluator for writing Hive UDAFs.
>> I am looking for a tutorial like http://beekeeperdata.com/
>> posts/hadoop/2015/08/17/hive-udaf-tutorial.html for phoenix.
>>
>> Thanks,
>> Akhil
>>
>> On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <ja...@apache.org>
>> wrote:
>>
>>> Hi Akhil,
>>> You want to create an Array, convert it to its byte[] representation,
>>> and set the ptr argument to point to it. Take a look at ArrayIT for
>>> examples of creating an Array:
>>>
>>>     // Create Array of FLOAT
>>>     Float[] floatArr =  new Float[2];
>>>     floatArr[0] = 64.87;
>>>     floatArr[1] = 89.96;
>>>     Array array = conn.createArrayOf("FLOAT", floatArr);
>>>     // Convert to byte[]
>>>     byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
>>>     // Set ptr to byte[]
>>>     ptr.set(arrayAsBytes);
>>>
>>> Thanks,
>>> James
>>>
>>>
>>> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <ak...@gmail.com>
>>> wrote:
>>>
>>>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns
>>>> whose datatype is 'VARBINARY'.
>>>> The data in these columns is compressed float[] in form of ByteBuffer
>>>> called DenseVector which is an ordered set of 16 bit IEEE floats of
>>>> cardinality no more than 3996.
>>>> I have loaded data into phoenix tables through spark-phoenix plugin.
>>>> Just to give an idea the mapreduce jobs write data in hive in parquet gzip
>>>> format. I read data into a dataframe using sqlContext.parquetFile() ,
>>>> register it as temp table and run a sqlContext.sql("select query ...")
>>>> query and finally calling res.save("org.apache.phoenix.spark",
>>>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
>>>> "localhost:2181"))
>>>> We have a hive/shark UDF(code pasted below) that can decode these
>>>> ByteBuffer columns and display them in readable float[]. So this UDF works
>>>> on spark-shell.
>>>> Now I just want to write a similar UDF in phoenix and run queries as "
>>>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
>>>> siteflowtable" and further write UDAFs over it.
>>>> How do I make phoenix UDF return float[] ?? I have tried a lot many
>>>> things but none seem to work.
>>>>
>>>> Below is the code for hive/shark UDF-
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>> package com.ABCD.densevectorudf;
>>>>
>>>> import java.nio.ByteBuffer;
>>>> import java.util.Vector;
>>>>
>>>> import org.apache.commons.logging.Log;
>>>> import org.apache.commons.logging.LogFactory;
>>>> import org.apache.hadoop.hive.ql.exec.Description;
>>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
>>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
>>>> import org.apache.hadoop.hive.ql.metadata.HiveException;
>>>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>>> ector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>>> rFactory;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
>>>> ryObjectInspector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>>> itiveObjectInspectorFactory;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
>>>> ngObjectInspector;
>>>> import org.apache.hadoop.io.FloatWritable;
>>>>
>>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>>> import com.ABCD.common.attval.Utility;
>>>> import com.ABCD.common.attval.array.BufferOperations;
>>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>>>
>>>> @Description(name = "DenseVectorUDF",
>>>> value = "Dense Vector UDF in Hive / Shark\n"
>>>> + "_FUNC_(binary|hex) - "
>>>> + "Returns the dense vector array<float> value\n",
>>>> extended = "Dense Vector UDAF in Hive / Shark")
>>>>
>>>> public class DenseVectorUDF extends GenericUDF {
>>>> private static final Log LOG = LogFactory.getLog(DenseVectorU
>>>> DF.class.getName());
>>>> private ObjectInspector inputOI;
>>>> private ListObjectInspector outputOI;
>>>>
>>>> @Override
>>>> public String getDisplayString(String[] children) {
>>>> StringBuilder sb = new StringBuilder();
>>>> sb.append("densevectorudf(");
>>>> for (int i = 0; i < children.length; i++) {
>>>> sb.append(children[i]);
>>>> if (i + 1 != children.length) {
>>>> sb.append(",");
>>>> }
>>>> }
>>>> sb.append(")");
>>>> return sb.toString();
>>>> }
>>>>
>>>> @Override
>>>> public ObjectInspector initialize(ObjectInspector[] arguments) throws
>>>> UDFArgumentException {
>>>> if (arguments.length == 1) {
>>>> ObjectInspector first = arguments[0];
>>>> if (!(first instanceof StringObjectInspector) && !(first instanceof
>>>> BinaryObjectInspector)) {
>>>> LOG.error("first argument must be a either binary or hex buffer");
>>>> throw new UDFArgumentException("first argument must be a either binary
>>>> or hex buffer");
>>>> }
>>>> inputOI = first;
>>>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
>>>> iveObjectInspectorFactory.writableFloatObjectInspector);
>>>> } else {
>>>> throw new UDFArgumentLengthException("Wrong argument length is passed.
>>>> Arguments length is NOT supported.");
>>>> }
>>>> return outputOI;
>>>> }
>>>>
>>>> @Override
>>>> public Object evaluate(DeferredObject[] arguments) throws HiveException
>>>> {
>>>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>>> Object object = arguments[0].get();
>>>> Vector<Float> floatVector = null;
>>>> ByteBuffer buff = null;
>>>> if (inputOI instanceof StringObjectInspector) {
>>>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
>>>> t(object);
>>>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
>>>> } else if (inputOI instanceof BinaryObjectInspector) {
>>>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
>>>> t(object);
>>>> buff = ByteBuffer.wrap(bytes);
>>>> }
>>>> floatVector = idv.getElements(buff);
>>>> Object red [] = new Object[floatVector.size()];
>>>> for(int index = 0; index < red.length; index++){
>>>> red[index] = new FloatWritable(floatVector.get(index));
>>>> }
>>>> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
>>>> return red;
>>>> }
>>>> }
>>>>
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>>
>>>>
>>>> Following is the code I have written for Phoenix UDF-
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>> package org.apache.phoenix.expression.function;
>>>>
>>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>>> import com.ABCD.common.attval.array.BufferOperations;
>>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>>> ector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>>> rFactory;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>>> itiveObjectInspectorFactory;
>>>> import org.apache.hadoop.io.FloatWritable;
>>>> import org.apache.phoenix.expression.Expression;
>>>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
>>>> nspector;
>>>> import org.apache.phoenix.parse.FunctionParseNode.Argument;
>>>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
>>>> import org.apache.phoenix.schema.SortOrder;
>>>> import org.apache.phoenix.schema.tuple.Tuple;
>>>> import org.apache.phoenix.schema.types.PDataType;
>>>> import org.apache.phoenix.schema.types.PFloatArray;
>>>> import org.apache.phoenix.schema.types.PVarbinary;
>>>>
>>>> import java.nio.ByteBuffer;
>>>> import java.nio.FloatBuffer;
>>>> import java.sql.SQLException;
>>>> import java.util.List;
>>>> import java.util.Vector;
>>>>
>>>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>>>>         @Argument(allowedTypes = {PVarbinary.class})})
>>>> public class DenseVectorFunction extends ScalarFunction {
>>>>     public static final String NAME = "DenseVectorFunction";
>>>>     private ListObjectInspector outputOI;
>>>>
>>>>     public DenseVectorFunction() {
>>>>     }
>>>>
>>>>     public DenseVectorFunction(List<Expression> children) throws
>>>> SQLException {
>>>>         super(children);
>>>>     }
>>>>
>>>>     @Override
>>>>     public String getName() {
>>>>         return NAME;
>>>>     }
>>>>
>>>>     public Expression getElementExpr() {
>>>>         return children.get(0);
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>>>>         if (!getElementExpr().evaluate(tuple, ptr)) {
>>>>             return false;
>>>>         }
>>>>         Object element = getElementExpr().getDataType().toObject(ptr,
>>>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
>>>> getElementExpr().getScale());
>>>>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>>>         PhoenixBinaryObjectInspector pboi = new
>>>> PhoenixBinaryObjectInspector();
>>>>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>>>>         Object object = ptr.get();
>>>>         Vector<Float> floatVector = null;
>>>>         ByteBuffer buff = null;
>>>>         buff = ByteBuffer.wrap(bytes);
>>>>         floatVector = idv.getElements(buff);
>>>>
>>>>         Object[] red = new Object[floatVector.size()];
>>>>         for (int index = 0; index < red.length; index++) {
>>>>             red[index] = new FloatWritable(floatVector.get(index));
>>>>             System.out.println("" + floatVector.get(index));
>>>>         }
>>>>         System.out.println("Buffer header = " +
>>>> BufferOperations.stringifyBuffer(buff)); // This prints header info in
>>>> ByteBuffer which is correct
>>>> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
>>>>         ptr.set(??);
>>>>         return true;
>>>>     }
>>>>
>>>>     @Override
>>>>     public SortOrder getSortOrder() {
>>>>         return children.get(0).getSortOrder();
>>>>     }
>>>>
>>>>     @Override
>>>>     public PDataType getDataType() {
>>>>         return PFloatArray.INSTANCE;
>>>>     }
>>>> }
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>>
>>>> Any help will be much appreciated.
>>>>
>>>
>>>
>>
>

Re: Phoenix custom UDF

Posted by "rajeshbabu@apache.org" <ch...@gmail.com>.
Hi Akhil,

There is no support for UDAFs in Phoenix at present.

Thanks,
Rajeshbabu.

On Sun, Oct 2, 2016 at 6:57 PM, akhil jain <ak...@gmail.com> wrote:

> Thanks James. It worked.
>
> Can you please provide me pointers to write UDAFs in phoenix like we
> have GenericUDAFEvaluator for writing Hive UDAFs.
> I am looking for a tutorial like http://beekeeperdata.com/
> posts/hadoop/2015/08/17/hive-udaf-tutorial.html for phoenix.
>
> Thanks,
> Akhil
>
> On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <ja...@apache.org>
> wrote:
>
>> Hi Akhil,
>> You want to create an Array, convert it to its byte[] representation, and
>> set the ptr argument to point to it. Take a look at ArrayIT for examples of
>> creating an Array:
>>
>>     // Create Array of FLOAT
>>     Float[] floatArr =  new Float[2];
>>     floatArr[0] = 64.87;
>>     floatArr[1] = 89.96;
>>     Array array = conn.createArrayOf("FLOAT", floatArr);
>>     // Convert to byte[]
>>     byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
>>     // Set ptr to byte[]
>>     ptr.set(arrayAsBytes);
>>
>> Thanks,
>> James
>>
>>
>> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <ak...@gmail.com>
>> wrote:
>>
>>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
>>> datatype is 'VARBINARY'.
>>> The data in these columns is compressed float[] in form of ByteBuffer
>>> called DenseVector which is an ordered set of 16 bit IEEE floats of
>>> cardinality no more than 3996.
>>> I have loaded data into phoenix tables through spark-phoenix plugin.
>>> Just to give an idea the mapreduce jobs write data in hive in parquet gzip
>>> format. I read data into a dataframe using sqlContext.parquetFile() ,
>>> register it as temp table and run a sqlContext.sql("select query ...")
>>> query and finally calling res.save("org.apache.phoenix.spark",
>>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
>>> "localhost:2181"))
>>> We have a hive/shark UDF(code pasted below) that can decode these
>>> ByteBuffer columns and display them in readable float[]. So this UDF works
>>> on spark-shell.
>>> Now I just want to write a similar UDF in phoenix and run queries as "
>>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
>>> siteflowtable" and further write UDAFs over it.
>>> How do I make phoenix UDF return float[] ?? I have tried a lot many
>>> things but none seem to work.
>>>
>>> Below is the code for hive/shark UDF-
>>> ------------------------------------------------------------
>>> ------------------------------
>>> package com.ABCD.densevectorudf;
>>>
>>> import java.nio.ByteBuffer;
>>> import java.util.Vector;
>>>
>>> import org.apache.commons.logging.Log;
>>> import org.apache.commons.logging.LogFactory;
>>> import org.apache.hadoop.hive.ql.exec.Description;
>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
>>> import org.apache.hadoop.hive.ql.metadata.HiveException;
>>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>> ector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>> rFactory;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
>>> ryObjectInspector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>> itiveObjectInspectorFactory;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
>>> ngObjectInspector;
>>> import org.apache.hadoop.io.FloatWritable;
>>>
>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>> import com.ABCD.common.attval.Utility;
>>> import com.ABCD.common.attval.array.BufferOperations;
>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>>
>>> @Description(name = "DenseVectorUDF",
>>> value = "Dense Vector UDF in Hive / Shark\n"
>>> + "_FUNC_(binary|hex) - "
>>> + "Returns the dense vector array<float> value\n",
>>> extended = "Dense Vector UDAF in Hive / Shark")
>>>
>>> public class DenseVectorUDF extends GenericUDF {
>>> private static final Log LOG = LogFactory.getLog(DenseVectorU
>>> DF.class.getName());
>>> private ObjectInspector inputOI;
>>> private ListObjectInspector outputOI;
>>>
>>> @Override
>>> public String getDisplayString(String[] children) {
>>> StringBuilder sb = new StringBuilder();
>>> sb.append("densevectorudf(");
>>> for (int i = 0; i < children.length; i++) {
>>> sb.append(children[i]);
>>> if (i + 1 != children.length) {
>>> sb.append(",");
>>> }
>>> }
>>> sb.append(")");
>>> return sb.toString();
>>> }
>>>
>>> @Override
>>> public ObjectInspector initialize(ObjectInspector[] arguments) throws
>>> UDFArgumentException {
>>> if (arguments.length == 1) {
>>> ObjectInspector first = arguments[0];
>>> if (!(first instanceof StringObjectInspector) && !(first instanceof
>>> BinaryObjectInspector)) {
>>> LOG.error("first argument must be a either binary or hex buffer");
>>> throw new UDFArgumentException("first argument must be a either binary
>>> or hex buffer");
>>> }
>>> inputOI = first;
>>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
>>> iveObjectInspectorFactory.writableFloatObjectInspector);
>>> } else {
>>> throw new UDFArgumentLengthException("Wrong argument length is passed.
>>> Arguments length is NOT supported.");
>>> }
>>> return outputOI;
>>> }
>>>
>>> @Override
>>> public Object evaluate(DeferredObject[] arguments) throws HiveException {
>>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>> Object object = arguments[0].get();
>>> Vector<Float> floatVector = null;
>>> ByteBuffer buff = null;
>>> if (inputOI instanceof StringObjectInspector) {
>>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
>>> t(object);
>>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
>>> } else if (inputOI instanceof BinaryObjectInspector) {
>>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
>>> t(object);
>>> buff = ByteBuffer.wrap(bytes);
>>> }
>>> floatVector = idv.getElements(buff);
>>> Object red [] = new Object[floatVector.size()];
>>> for(int index = 0; index < red.length; index++){
>>> red[index] = new FloatWritable(floatVector.get(index));
>>> }
>>> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
>>> return red;
>>> }
>>> }
>>>
>>> ------------------------------------------------------------
>>> ------------------------------
>>>
>>>
>>> Following is the code I have written for Phoenix UDF-
>>> ------------------------------------------------------------
>>> ------------------------------
>>> package org.apache.phoenix.expression.function;
>>>
>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>> import com.ABCD.common.attval.array.BufferOperations;
>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>> ector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>> rFactory;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>> itiveObjectInspectorFactory;
>>> import org.apache.hadoop.io.FloatWritable;
>>> import org.apache.phoenix.expression.Expression;
>>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
>>> nspector;
>>> import org.apache.phoenix.parse.FunctionParseNode.Argument;
>>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
>>> import org.apache.phoenix.schema.SortOrder;
>>> import org.apache.phoenix.schema.tuple.Tuple;
>>> import org.apache.phoenix.schema.types.PDataType;
>>> import org.apache.phoenix.schema.types.PFloatArray;
>>> import org.apache.phoenix.schema.types.PVarbinary;
>>>
>>> import java.nio.ByteBuffer;
>>> import java.nio.FloatBuffer;
>>> import java.sql.SQLException;
>>> import java.util.List;
>>> import java.util.Vector;
>>>
>>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>>>         @Argument(allowedTypes = {PVarbinary.class})})
>>> public class DenseVectorFunction extends ScalarFunction {
>>>     public static final String NAME = "DenseVectorFunction";
>>>     private ListObjectInspector outputOI;
>>>
>>>     public DenseVectorFunction() {
>>>     }
>>>
>>>     public DenseVectorFunction(List<Expression> children) throws
>>> SQLException {
>>>         super(children);
>>>     }
>>>
>>>     @Override
>>>     public String getName() {
>>>         return NAME;
>>>     }
>>>
>>>     public Expression getElementExpr() {
>>>         return children.get(0);
>>>     }
>>>
>>>     @Override
>>>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>>>         if (!getElementExpr().evaluate(tuple, ptr)) {
>>>             return false;
>>>         }
>>>         Object element = getElementExpr().getDataType().toObject(ptr,
>>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
>>> getElementExpr().getScale());
>>>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>>         PhoenixBinaryObjectInspector pboi = new
>>> PhoenixBinaryObjectInspector();
>>>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>>>         Object object = ptr.get();
>>>         Vector<Float> floatVector = null;
>>>         ByteBuffer buff = null;
>>>         buff = ByteBuffer.wrap(bytes);
>>>         floatVector = idv.getElements(buff);
>>>
>>>         Object[] red = new Object[floatVector.size()];
>>>         for (int index = 0; index < red.length; index++) {
>>>             red[index] = new FloatWritable(floatVector.get(index));
>>>             System.out.println("" + floatVector.get(index));
>>>         }
>>>         System.out.println("Buffer header = " +
>>> BufferOperations.stringifyBuffer(buff)); // This prints header info in
>>> ByteBuffer which is correct
>>> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
>>>         ptr.set(??);
>>>         return true;
>>>     }
>>>
>>>     @Override
>>>     public SortOrder getSortOrder() {
>>>         return children.get(0).getSortOrder();
>>>     }
>>>
>>>     @Override
>>>     public PDataType getDataType() {
>>>         return PFloatArray.INSTANCE;
>>>     }
>>> }
>>> ------------------------------------------------------------
>>> ------------------------------
>>>
>>> Any help will be much appreciated.
>>>
>>
>>
>

Re: Phoenix custom UDF

Posted by akhil jain <ak...@gmail.com>.
Thanks James. It worked.

Can you please provide me pointers to write UDAFs in phoenix like we
have GenericUDAFEvaluator for writing Hive UDAFs.
I am looking for a tutorial like
http://beekeeperdata.com/posts/hadoop/2015/08/17/hive-udaf-tutorial.html
for phoenix.

Thanks,
Akhil

On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <ja...@apache.org> wrote:

> Hi Akhil,
> You want to create an Array, convert it to its byte[] representation, and
> set the ptr argument to point to it. Take a look at ArrayIT for examples of
> creating an Array:
>
>     // Create Array of FLOAT
>     Float[] floatArr =  new Float[2];
>     floatArr[0] = 64.87;
>     floatArr[1] = 89.96;
>     Array array = conn.createArrayOf("FLOAT", floatArr);
>     // Convert to byte[]
>     byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
>     // Set ptr to byte[]
>     ptr.set(arrayAsBytes);
>
> Thanks,
> James
>
>
> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <ak...@gmail.com>
> wrote:
>
>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
>> datatype is 'VARBINARY'.
>> The data in these columns is compressed float[] in form of ByteBuffer
>> called DenseVector which is an ordered set of 16 bit IEEE floats of
>> cardinality no more than 3996.
>> I have loaded data into phoenix tables through spark-phoenix plugin. Just
>> to give an idea the mapreduce jobs write data in hive in parquet gzip
>> format. I read data into a dataframe using sqlContext.parquetFile() ,
>> register it as temp table and run a sqlContext.sql("select query ...")
>> query and finally calling res.save("org.apache.phoenix.spark",
>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
>> "localhost:2181"))
>> We have a hive/shark UDF(code pasted below) that can decode these
>> ByteBuffer columns and display them in readable float[]. So this UDF works
>> on spark-shell.
>> Now I just want to write a similar UDF in phoenix and run queries as "
>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
>> siteflowtable" and further write UDAFs over it.
>> How do I make phoenix UDF return float[] ?? I have tried a lot many
>> things but none seem to work.
>>
>> Below is the code for hive/shark UDF-
>> ------------------------------------------------------------
>> ------------------------------
>> package com.ABCD.densevectorudf;
>>
>> import java.nio.ByteBuffer;
>> import java.util.Vector;
>>
>> import org.apache.commons.logging.Log;
>> import org.apache.commons.logging.LogFactory;
>> import org.apache.hadoop.hive.ql.exec.Description;
>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
>> import org.apache.hadoop.hive.ql.metadata.HiveException;
>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>> rFactory;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
>> ryObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>> itiveObjectInspectorFactory;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
>> ngObjectInspector;
>> import org.apache.hadoop.io.FloatWritable;
>>
>> import com.ABCD.common.attval.IDenseVectorOperator;
>> import com.ABCD.common.attval.Utility;
>> import com.ABCD.common.attval.array.BufferOperations;
>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>
>> @Description(name = "DenseVectorUDF",
>> value = "Dense Vector UDF in Hive / Shark\n"
>> + "_FUNC_(binary|hex) - "
>> + "Returns the dense vector array<float> value\n",
>> extended = "Dense Vector UDAF in Hive / Shark")
>>
>> public class DenseVectorUDF extends GenericUDF {
>> private static final Log LOG = LogFactory.getLog(DenseVectorU
>> DF.class.getName());
>> private ObjectInspector inputOI;
>> private ListObjectInspector outputOI;
>>
>> @Override
>> public String getDisplayString(String[] children) {
>> StringBuilder sb = new StringBuilder();
>> sb.append("densevectorudf(");
>> for (int i = 0; i < children.length; i++) {
>> sb.append(children[i]);
>> if (i + 1 != children.length) {
>> sb.append(",");
>> }
>> }
>> sb.append(")");
>> return sb.toString();
>> }
>>
>> @Override
>> public ObjectInspector initialize(ObjectInspector[] arguments) throws
>> UDFArgumentException {
>> if (arguments.length == 1) {
>> ObjectInspector first = arguments[0];
>> if (!(first instanceof StringObjectInspector) && !(first instanceof
>> BinaryObjectInspector)) {
>> LOG.error("first argument must be a either binary or hex buffer");
>> throw new UDFArgumentException("first argument must be a either binary or
>> hex buffer");
>> }
>> inputOI = first;
>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
>> iveObjectInspectorFactory.writableFloatObjectInspector);
>> } else {
>> throw new UDFArgumentLengthException("Wrong argument length is passed.
>> Arguments length is NOT supported.");
>> }
>> return outputOI;
>> }
>>
>> @Override
>> public Object evaluate(DeferredObject[] arguments) throws HiveException {
>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>> Object object = arguments[0].get();
>> Vector<Float> floatVector = null;
>> ByteBuffer buff = null;
>> if (inputOI instanceof StringObjectInspector) {
>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
>> t(object);
>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
>> } else if (inputOI instanceof BinaryObjectInspector) {
>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
>> t(object);
>> buff = ByteBuffer.wrap(bytes);
>> }
>> floatVector = idv.getElements(buff);
>> Object red [] = new Object[floatVector.size()];
>> for(int index = 0; index < red.length; index++){
>> red[index] = new FloatWritable(floatVector.get(index));
>> }
>> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
>> return red;
>> }
>> }
>>
>> ------------------------------------------------------------
>> ------------------------------
>>
>>
>> Following is the code I have written for Phoenix UDF-
>> ------------------------------------------------------------
>> ------------------------------
>> package org.apache.phoenix.expression.function;
>>
>> import com.ABCD.common.attval.IDenseVectorOperator;
>> import com.ABCD.common.attval.array.BufferOperations;
>> import com.ABCD.common.attval.array.FloatArrayFactory;
>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>> rFactory;
>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>> itiveObjectInspectorFactory;
>> import org.apache.hadoop.io.FloatWritable;
>> import org.apache.phoenix.expression.Expression;
>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
>> nspector;
>> import org.apache.phoenix.parse.FunctionParseNode.Argument;
>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
>> import org.apache.phoenix.schema.SortOrder;
>> import org.apache.phoenix.schema.tuple.Tuple;
>> import org.apache.phoenix.schema.types.PDataType;
>> import org.apache.phoenix.schema.types.PFloatArray;
>> import org.apache.phoenix.schema.types.PVarbinary;
>>
>> import java.nio.ByteBuffer;
>> import java.nio.FloatBuffer;
>> import java.sql.SQLException;
>> import java.util.List;
>> import java.util.Vector;
>>
>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>>         @Argument(allowedTypes = {PVarbinary.class})})
>> public class DenseVectorFunction extends ScalarFunction {
>>     public static final String NAME = "DenseVectorFunction";
>>     private ListObjectInspector outputOI;
>>
>>     public DenseVectorFunction() {
>>     }
>>
>>     public DenseVectorFunction(List<Expression> children) throws
>> SQLException {
>>         super(children);
>>     }
>>
>>     @Override
>>     public String getName() {
>>         return NAME;
>>     }
>>
>>     public Expression getElementExpr() {
>>         return children.get(0);
>>     }
>>
>>     @Override
>>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>>         if (!getElementExpr().evaluate(tuple, ptr)) {
>>             return false;
>>         }
>>         Object element = getElementExpr().getDataType().toObject(ptr,
>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
>> getElementExpr().getScale());
>>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>         PhoenixBinaryObjectInspector pboi = new
>> PhoenixBinaryObjectInspector();
>>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>>         Object object = ptr.get();
>>         Vector<Float> floatVector = null;
>>         ByteBuffer buff = null;
>>         buff = ByteBuffer.wrap(bytes);
>>         floatVector = idv.getElements(buff);
>>
>>         Object[] red = new Object[floatVector.size()];
>>         for (int index = 0; index < red.length; index++) {
>>             red[index] = new FloatWritable(floatVector.get(index));
>>             System.out.println("" + floatVector.get(index));
>>         }
>>         System.out.println("Buffer header = " +
>> BufferOperations.stringifyBuffer(buff)); // This prints header info in
>> ByteBuffer which is correct
>> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
>>         ptr.set(??);
>>         return true;
>>     }
>>
>>     @Override
>>     public SortOrder getSortOrder() {
>>         return children.get(0).getSortOrder();
>>     }
>>
>>     @Override
>>     public PDataType getDataType() {
>>         return PFloatArray.INSTANCE;
>>     }
>> }
>> ------------------------------------------------------------
>> ------------------------------
>>
>> Any help will be much appreciated.
>>
>
>

Re: Phoenix custom UDF

Posted by James Taylor <ja...@apache.org>.
Hi Akhil,
You want to create an Array, convert it to its byte[] representation, and
set the ptr argument to point to it. Take a look at ArrayIT for examples of
creating an Array:

    // Create Array of FLOAT
    Float[] floatArr =  new Float[2];
    floatArr[0] = 64.87;
    floatArr[1] = 89.96;
    Array array = conn.createArrayOf("FLOAT", floatArr);
    // Convert to byte[]
    byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
    // Set ptr to byte[]
    ptr.set(arrayAsBytes);

Thanks,
James


On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <ak...@gmail.com> wrote:

> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
> datatype is 'VARBINARY'.
> The data in these columns is compressed float[] in form of ByteBuffer
> called DenseVector which is an ordered set of 16 bit IEEE floats of
> cardinality no more than 3996.
> I have loaded data into phoenix tables through spark-phoenix plugin. Just
> to give an idea the mapreduce jobs write data in hive in parquet gzip
> format. I read data into a dataframe using sqlContext.parquetFile() ,
> register it as temp table and run a sqlContext.sql("select query ...")
> query and finally calling res.save("org.apache.phoenix.spark",
> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
> "localhost:2181"))
> We have a hive/shark UDF(code pasted below) that can decode these
> ByteBuffer columns and display them in readable float[]. So this UDF works
> on spark-shell.
> Now I just want to write a similar UDF in phoenix and run queries as "
> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
> siteflowtable" and further write UDAFs over it.
> How do I make phoenix UDF return float[] ?? I have tried a lot many things
> but none seem to work.
>
> Below is the code for hive/shark UDF-
> ------------------------------------------------------------
> ------------------------------
> package com.ABCD.densevectorudf;
>
> import java.nio.ByteBuffer;
> import java.util.Vector;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.hive.ql.exec.Description;
> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
> import org.apache.hadoop.hive.ql.metadata.HiveException;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
> rFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
> ryObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
> itiveObjectInspectorFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
> ngObjectInspector;
> import org.apache.hadoop.io.FloatWritable;
>
> import com.ABCD.common.attval.IDenseVectorOperator;
> import com.ABCD.common.attval.Utility;
> import com.ABCD.common.attval.array.BufferOperations;
> import com.ABCD.common.attval.array.FloatArrayFactory;
>
> @Description(name = "DenseVectorUDF",
> value = "Dense Vector UDF in Hive / Shark\n"
> + "_FUNC_(binary|hex) - "
> + "Returns the dense vector array<float> value\n",
> extended = "Dense Vector UDAF in Hive / Shark")
>
> public class DenseVectorUDF extends GenericUDF {
> private static final Log LOG = LogFactory.getLog(DenseVectorU
> DF.class.getName());
> private ObjectInspector inputOI;
> private ListObjectInspector outputOI;
>
> @Override
> public String getDisplayString(String[] children) {
> StringBuilder sb = new StringBuilder();
> sb.append("densevectorudf(");
> for (int i = 0; i < children.length; i++) {
> sb.append(children[i]);
> if (i + 1 != children.length) {
> sb.append(",");
> }
> }
> sb.append(")");
> return sb.toString();
> }
>
> @Override
> public ObjectInspector initialize(ObjectInspector[] arguments) throws
> UDFArgumentException {
> if (arguments.length == 1) {
> ObjectInspector first = arguments[0];
> if (!(first instanceof StringObjectInspector) && !(first instanceof
> BinaryObjectInspector)) {
> LOG.error("first argument must be a either binary or hex buffer");
> throw new UDFArgumentException("first argument must be a either binary or
> hex buffer");
> }
> inputOI = first;
> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
> iveObjectInspectorFactory.writableFloatObjectInspector);
> } else {
> throw new UDFArgumentLengthException("Wrong argument length is passed.
> Arguments length is NOT supported.");
> }
> return outputOI;
> }
>
> @Override
> public Object evaluate(DeferredObject[] arguments) throws HiveException {
> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
> Object object = arguments[0].get();
> Vector<Float> floatVector = null;
> ByteBuffer buff = null;
> if (inputOI instanceof StringObjectInspector) {
> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
> t(object);
> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
> } else if (inputOI instanceof BinaryObjectInspector) {
> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
> t(object);
> buff = ByteBuffer.wrap(bytes);
> }
> floatVector = idv.getElements(buff);
> Object red [] = new Object[floatVector.size()];
> for(int index = 0; index < red.length; index++){
> red[index] = new FloatWritable(floatVector.get(index));
> }
> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
> return red;
> }
> }
>
> ------------------------------------------------------------
> ------------------------------
>
>
> Following is the code I have written for Phoenix UDF-
> ------------------------------------------------------------
> ------------------------------
> package org.apache.phoenix.expression.function;
>
> import com.ABCD.common.attval.IDenseVectorOperator;
> import com.ABCD.common.attval.array.BufferOperations;
> import com.ABCD.common.attval.array.FloatArrayFactory;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
> rFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
> itiveObjectInspectorFactory;
> import org.apache.hadoop.io.FloatWritable;
> import org.apache.phoenix.expression.Expression;
> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
> nspector;
> import org.apache.phoenix.parse.FunctionParseNode.Argument;
> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
> import org.apache.phoenix.schema.SortOrder;
> import org.apache.phoenix.schema.tuple.Tuple;
> import org.apache.phoenix.schema.types.PDataType;
> import org.apache.phoenix.schema.types.PFloatArray;
> import org.apache.phoenix.schema.types.PVarbinary;
>
> import java.nio.ByteBuffer;
> import java.nio.FloatBuffer;
> import java.sql.SQLException;
> import java.util.List;
> import java.util.Vector;
>
> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>         @Argument(allowedTypes = {PVarbinary.class})})
> public class DenseVectorFunction extends ScalarFunction {
>     public static final String NAME = "DenseVectorFunction";
>     private ListObjectInspector outputOI;
>
>     public DenseVectorFunction() {
>     }
>
>     public DenseVectorFunction(List<Expression> children) throws
> SQLException {
>         super(children);
>     }
>
>     @Override
>     public String getName() {
>         return NAME;
>     }
>
>     public Expression getElementExpr() {
>         return children.get(0);
>     }
>
>     @Override
>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>         if (!getElementExpr().evaluate(tuple, ptr)) {
>             return false;
>         }
>         Object element = getElementExpr().getDataType().toObject(ptr,
> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
> getElementExpr().getScale());
>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>         PhoenixBinaryObjectInspector pboi = new
> PhoenixBinaryObjectInspector();
>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>         Object object = ptr.get();
>         Vector<Float> floatVector = null;
>         ByteBuffer buff = null;
>         buff = ByteBuffer.wrap(bytes);
>         floatVector = idv.getElements(buff);
>
>         Object[] red = new Object[floatVector.size()];
>         for (int index = 0; index < red.length; index++) {
>             red[index] = new FloatWritable(floatVector.get(index));
>             System.out.println("" + floatVector.get(index));
>         }
>         System.out.println("Buffer header = " +
> BufferOperations.stringifyBuffer(buff)); // This prints header info in
> ByteBuffer which is correct
> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
>         ptr.set(??);
>         return true;
>     }
>
>     @Override
>     public SortOrder getSortOrder() {
>         return children.get(0).getSortOrder();
>     }
>
>     @Override
>     public PDataType getDataType() {
>         return PFloatArray.INSTANCE;
>     }
> }
> ------------------------------------------------------------
> ------------------------------
>
> Any help will be much appreciated.
>