You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2022/01/27 12:59:00 UTC

[jira] [Created] (FLINK-25856) Fix use of UserDefinedType in from_elements

Huang Xingbo created FLINK-25856:
------------------------------------

             Summary: Fix use of UserDefinedType in from_elements
                 Key: FLINK-25856
                 URL: https://issues.apache.org/jira/browse/FLINK-25856
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.14.3, 1.15.0
            Reporter: Huang Xingbo
            Assignee: Huang Xingbo
             Fix For: 1.15.0, 1.14.4


If we define a new UserDefinedType, and use it in `from_elements`, it will failed.

{code:python}
class VectorUDT(UserDefinedType):
    @classmethod
    def sql_type(cls):
        return DataTypes.ROW(
            [
                DataTypes.FIELD("type", DataTypes.TINYINT()),
                DataTypes.FIELD("size", DataTypes.INT()),
                DataTypes.FIELD("indices", DataTypes.ARRAY(DataTypes.INT())),
                DataTypes.FIELD("values", DataTypes.ARRAY(DataTypes.DOUBLE())),
            ]
        )

    @classmethod
    def module(cls):
        return "pyflink.ml.core.linalg"

    def serialize(self, obj):
        if isinstance(obj, SparseVector):
            indices = [int(i) for i in obj._indices]
            values = [float(v) for v in obj._values]
            return 0, obj.size(), indices, values
        elif isinstance(obj, DenseVector):
            values = [float(v) for v in obj._values]
            return 1, None, None, values
        else:
            raise TypeError("Cannot serialize %r of type %r".format(obj, type(obj)))
{code}

{code:python}
self.t_env.from_elements([
            (Vectors.dense([1, 2, 3, 4]), 0., 1.),
            (Vectors.dense([2, 2, 3, 4]), 0., 2.),
            (Vectors.dense([3, 2, 3, 4]), 0., 3.),
            (Vectors.dense([4, 2, 3, 4]), 0., 4.),
            (Vectors.dense([5, 2, 3, 4]), 0., 5.),
            (Vectors.dense([11, 2, 3, 4]), 1., 1.),
            (Vectors.dense([12, 2, 3, 4]), 1., 2.),
            (Vectors.dense([13, 2, 3, 4]), 1., 3.),
            (Vectors.dense([14, 2, 3, 4]), 1., 4.),
            (Vectors.dense([15, 2, 3, 4]), 1., 5.),
        ],
            DataTypes.ROW([
                DataTypes.FIELD("features", VectorUDT()),
                DataTypes.FIELD("label", DataTypes.DOUBLE()),
                DataTypes.FIELD("weight", DataTypes.DOUBLE())]))
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)