You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/02/24 08:29:38 UTC

[flink-ml] branch master updated: [FLINK-26266][ml][python] Support Vector and Matrix in ML Python API

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 10cae7b  [FLINK-26266][ml][python] Support Vector and Matrix in ML Python API
10cae7b is described below

commit 10cae7b7a656f68dc4ea9f5bdc32441c141e0bbf
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Tue Feb 22 18:00:44 2022 +0800

    [FLINK-26266][ml][python] Support Vector and Matrix in ML Python API
    
    This closes #64.
---
 flink-ml-python/dev/dev-requirements.txt        |   3 +-
 flink-ml-python/pyflink/ml/core/linalg.py       | 544 ++++++++++++++++++++++++
 flink-ml-python/pyflink/ml/tests/test_linalg.py |  79 ++++
 flink-ml-python/setup.py                        |   2 +-
 4 files changed, 626 insertions(+), 2 deletions(-)

diff --git a/flink-ml-python/dev/dev-requirements.txt b/flink-ml-python/dev/dev-requirements.txt
index 0c32456..9c8d89e 100755
--- a/flink-ml-python/dev/dev-requirements.txt
+++ b/flink-ml-python/dev/dev-requirements.txt
@@ -20,4 +20,5 @@ jsonpickle==2.0.0
 cloudpickle==1.2.2
 pytest==4.4.1
 flake8==4.0.1
-mypy==0.910
\ No newline at end of file
+mypy==0.910
+numpy>=1.14.3,<1.20
\ No newline at end of file
diff --git a/flink-ml-python/pyflink/ml/core/linalg.py b/flink-ml-python/pyflink/ml/core/linalg.py
new file mode 100644
index 0000000..cd9cacd
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/core/linalg.py
@@ -0,0 +1,544 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import array
+from abc import ABC, abstractmethod
+from typing import Union, Sized
+
+import numpy as np
+
+
+class Vector(ABC):
+    """
+    Abstract class for DenseVector and SparseVector.
+    """
+
+    @abstractmethod
+    def size(self) -> int:
+        """
+        Gets the size of the vector.
+        """
+        pass
+
+    @abstractmethod
+    def get(self, i: int):
+        """
+        Gets the value of the ith element.
+        """
+        pass
+
+    @abstractmethod
+    def to_array(self) -> np.ndarray:
+        """
+        Convert the vector into an numpy.ndarray
+        """
+        pass
+
+    @staticmethod
+    def _equals(v1_indices, v1_values, v2_indices, v2_values):
+        v1_size = len(v1_values)
+        v2_size = len(v2_values)
+        k1 = 0
+        k2 = 0
+        all_equal = True
+        while all_equal:
+            while k1 < v1_size and v1_values[k1] == 0:
+                k1 += 1
+            while k2 < v2_size and v2_values[k2] == 0:
+                k2 += 1
+
+            if k1 >= v1_size or k2 >= v2_size:
+                return k1 >= v1_size and k2 >= v2_size
+
+            all_equal = v1_indices[k1] == v2_indices[k2] and v1_values[k1] == v2_values[k2]
+            k1 += 1
+            k2 += 1
+        return all_equal
+
+    def __len__(self):
+        return self.size()
+
+    def __getitem__(self, key):
+        if isinstance(key, int):
+            return self.get(key)
+        raise TypeError("Invalid argument type")
+
+
+class DenseVector(Vector):
+    """
+    A dense vector represented by a value array. We use numpy array for storage and arithmetic
+    will be delegated to the underlying numpy array.
+
+    Examples:
+    ::
+
+        >>> v = Vectors.dense([1.0, 2.0])
+        >>> u = Vectors.dense([3.0, 4.0])
+        >>> v + u
+        DenseVector([4.0, 6.0])
+        >>> 2 - v
+        DenseVector([1.0, 0.0])
+        >>> v / 2
+        DenseVector([0.5, 1.0])
+        >>> v * u
+        DenseVector([3.0, 8.0])
+        >>> u / v
+        DenseVector([3.0, 2.0])
+        >>> u % 2
+        DenseVector([1.0, 0.0])
+        >>> -v
+        DenseVector([-1.0, -2.0])
+    """
+
+    def __init__(self, values):
+        if not isinstance(values, np.ndarray):
+            values = np.array(values, dtype=np.float64)
+        if values.dtype != np.float64:
+            values = values.astype(np.float64)
+        self._values = values
+
+    def size(self) -> int:
+        return len(self._values)
+
+    def get(self, i: int):
+        return self._values[i]
+
+    def to_array(self) -> np.ndarray:
+        return self._values
+
+    def dot(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+        """
+        Dot product of two Vectors.
+
+        Examples:
+        ::
+
+            >>> import  array
+            >>> dense = DenseVector(array.array('d', [1., 2.]))
+            >>> dense.dot(dense)
+            5.0
+            >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
+            4.0
+            >>> dense.dot(range(1, 3))
+            5.0
+            >>> dense.dot(np.array(range(1, 3)))
+            5.0
+            >>> dense.dot(np.reshape([1., 2., 3., 4.], (2, 2), order='F'))
+            array([ 5., 11.])
+        """
+        if type(other) == np.ndarray:
+            return np.dot(self._values, other)
+        else:
+            assert len(self) == len(other), "dimension mismatch"
+            if isinstance(other, SparseVector):
+                return other.dot(self)
+            elif isinstance(other, Vector):
+                return np.dot(self._values, other.to_array())
+            else:
+                return np.dot(self._values, other)
+
+    def squared_distance(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+        """
+        Squared distance of two Vectors.
+
+        Examples:
+        ::
+
+            >>> import array
+            >>> dense1 = DenseVector(array.array('d', [1., 2.]))
+            >>> dense1.squared_distance(dense1)
+            0.0
+            >>> dense2 = np.array((2., 1.))
+            >>> dense1.squared_distance(dense2)
+            2.0
+            >>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
+            >>> dense1.squared_distance(sparse1)
+            2.0
+        """
+        assert len(self) == len(other), "dimension mismatch"
+        if isinstance(other, SparseVector):
+            return other.squared_distance(self)
+
+        if isinstance(other, Vector):
+            other = other.to_array()
+        elif not isinstance(other, np.ndarray):
+            other = np.array(other)
+        diff = self._values - other
+        return np.dot(diff, diff)
+
+    @property
+    def values(self):
+        """
+        Returns the underlying numpy.ndarray
+        """
+        return self._values
+
+    def __getitem__(self, item):
+        return self._values[item]
+
+    def __len__(self):
+        return self.size()
+
+    def __eq__(self, other):
+        if isinstance(other, DenseVector):
+            return np.array_equal(self._values, other._values)
+        elif isinstance(other, SparseVector):
+            if self.size() != other.size():
+                return False
+            return Vector._equals(
+                list(range(len(self))), self._values, other._indices, other._values)
+        return False
+
+    def __str__(self):
+        return "[" + ",".join([str(v) for v in self._values]) + "]"
+
+    def __repr__(self):
+        return "DenseVector([%s])" % (", ".join(str(i) for i in self._values))
+
+    def _unary_op(op):
+        def _(self):
+            return DenseVector(getattr(self._values, op)())
+
+        return _
+
+    def _binary_op(op):
+        def _(self, other):
+            if isinstance(other, DenseVector):
+                other = other._values
+            return DenseVector(getattr(self._values, op)(other))
+
+        return _
+
+    # arithmetic functions
+    __add__ = _binary_op("__add__")  # type: ignore
+    __sub__ = _binary_op("__sub__")  # type: ignore
+    __mul__ = _binary_op("__mul__")  # type: ignore
+    __truediv__ = _binary_op("__truediv__")  # type: ignore
+    __mod__ = _binary_op("__mod__")  # type: ignore
+    __radd__ = _binary_op("__radd__")  # type: ignore
+    __rsub__ = _binary_op("__rsub__")  # type: ignore
+    __rmul__ = _binary_op("__rmul__")  # type: ignore
+    __rtruediv__ = _binary_op("__rtruediv__")  # type: ignore
+    __rmod__ = _binary_op("__rmod__")  # type: ignore
+    __neg__ = _unary_op("__neg__")  # type: ignore
+
+
+class SparseVector(Vector):
+    """
+    A sparse vector, using either a dict, a list of (index, value) pairs, or two separate
+    arrays of indices and values.
+
+    Example:
+    ::
+
+        >>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
+        SparseVector(4, {1: 1.0, 3: 5.5})
+        >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+        SparseVector(4, {1: 1.0, 3: 5.5})
+        >>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
+        SparseVector(4, {1: 1.0, 3: 5.5})
+    """
+
+    def __init__(self, size: int, *args):
+        self._size = size
+        assert 1 <= len(args) <= 2, "The number of arguments must be 2 or 3"
+        if len(args) == 1:
+            # a dict, a list of (index, value) pairs
+            pairs = args[0]
+            if isinstance(pairs, dict):
+                pairs = pairs.items()
+            pairs = sorted(pairs)
+            self._indices = np.array([p[0] for p in pairs], dtype=np.int32)
+            self._values = np.array([p[1] for p in pairs], dtype=np.float64)
+        else:
+            assert len(args[0]) == len(args[1]), "The length of indices and values should be same"
+            # two separate arrays of indices and values.
+            self._indices = np.array(args[0], dtype=np.int32)
+            self._values = np.array(args[1], dtype=np.float64)
+            for i in range(len(self._indices) - 1):
+                if self._indices[i] >= self._indices[i + 1]:
+                    raise TypeError(
+                        "Indices {0} and {1} are not strictly increasing".format(
+                            self._indices[i], self._indices[i + 1]))
+
+    def size(self) -> int:
+        return self._size
+
+    def get(self, i: int):
+        return self._values[self._indices.searchsorted(i)]
+
+    def to_array(self) -> np.ndarray:
+        """
+        Returns a copy of this SparseVector as a 1-dimensional NumPy array.
+        """
+        arr = np.zeros((self._size,), dtype=np.float64)
+        arr[self._indices] = self._values
+        return arr
+
+    def dot(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+        """
+        Dot product of two Vectors.
+
+        Examples:
+        ::
+
+            >>> sparse = SparseVector(4, [1, 3], [3.0, 4.0])
+            >>> sparse.dot(sparse)
+            25.0
+            >>> sparse.dot(array.array('d', [1., 2., 3., 4.]))
+            22.0
+            >>> sparse2 = SparseVector(4, [2], [1.0])
+            >>> sparse.dot(sparse2)
+            0.0
+            >>> sparse.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+            array([22., 22.])
+        """
+        if isinstance(other, np.ndarray):
+            if other.ndim not in (2, 1):
+                raise ValueError('Cannot call dot with %d-dimensional array' % other.ndim)
+            assert len(self) == other.shape[0], "dimension mismatch"
+            return np.dot(self._values, other[self._indices])
+
+        assert len(self) == len(other), "dimension mismatch"
+
+        if isinstance(other, DenseVector):
+            return np.dot(other.to_array()[self._indices], self._values)
+        elif isinstance(other, SparseVector):
+            self_cmind = np.in1d(self._indices, other._indices, assume_unique=True)
+            self_values = self._values[self_cmind]
+            if self_values.size == 0:
+                return np.float_(0.0)
+            else:
+                other_cmind = np.in1d(other._indices, self._indices, assume_unique=True)
+                return np.dot(self_values, other._values[other_cmind])
+        else:
+            if isinstance(other, (array.array, np.ndarray, list, tuple, range)):
+                return self.dot(DenseVector(other))
+            raise ValueError('Cannot call with the type %s' % (type(other)))
+
+    def squared_distance(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+        """
+        Squared distance of two Vectors.
+
+        Examples:
+        ::
+
+            >>> sparse = SparseVector(4, [1, 3], [3.0, 4.0])
+            >>> sparse.squared_distance(sparse)
+            0.0
+            >>> sparse.squared_distance(array.array('d', [1., 2., 3., 4.]))
+            11.0
+            >>> sparse.squared_distance(np.array([1., 2., 3., 4.]))
+            11.0
+            >>> sparse2 = SparseVector(4, [2], [1.0])
+            >>> sparse.squared_distance(sparse2)
+            26.0
+            >>> sparse2.squared_distance(sparse)
+            26.0
+        """
+        assert len(self) == len(other), "dimension mismatch"
+
+        if isinstance(other, np.ndarray) or isinstance(other, DenseVector):
+            if isinstance(other, np.ndarray) and other.ndim != 1:
+                raise ValueError(
+                    "Cannot call squared_distance with %d-dimensional array" % other.ndim)
+
+            if isinstance(other, DenseVector):
+                other = other.to_array()
+            sparse_ind = np.zeros(other.size, dtype=bool)
+            sparse_ind[self._indices] = True
+            dist = other[sparse_ind] - self._values
+            result = np.dot(dist, dist)
+
+            other_ind = other[~sparse_ind]
+            result += np.dot(other_ind, other_ind)
+            return result
+        elif isinstance(other, SparseVector):
+            i = 0
+            j = 0
+            result = 0.0
+            while i < len(self._indices) and j < len(other._indices):
+                if self._indices[i] == other._indices[j]:
+                    diff = self._values[i] - other._values[j]
+                    result += diff * diff
+                    i += 1
+                    j += 1
+                elif self._indices[i] < other._indices[j]:
+                    result += self._values[i] * self._values[i]
+                    i += 1
+                else:
+                    result += other._values[j] * other._values[j]
+                    j += 1
+            while i < len(self._indices):
+                result += self._values[i] * self._values[i]
+                i += 1
+            while j < len(other._indices):
+                result += other._values[j] * other._values[j]
+                j += 1
+            return np.float_(result)
+        else:
+            if isinstance(other, (array.array, np.ndarray, list, tuple, range)):
+                return self.squared_distance(DenseVector(other))
+            raise ValueError('Cannot call with the type %s' % (type(other)))
+
+    def __len__(self):
+        return self.size()
+
+    def __eq__(self, other):
+        if isinstance(other, SparseVector):
+            return (other.size() == self.size()
+                    and np.array_equal(other._indices, self._indices)
+                    and np.array_equal(other._values, self._values))
+        elif isinstance(other, DenseVector):
+            if self.size != len(other):
+                return False
+            return Vector._equals(
+                self._indices, self._values, list(range(len(other))), other.to_array())
+        return False
+
+    def __str__(self):
+        inds = "[" + ",".join([str(i) for i in self._indices]) + "]"
+        vals = "[" + ",".join([str(v) for v in self._values]) + "]"
+        return "(" + ",".join((str(self.size), inds, vals)) + ")"
+
+    def __repr__(self):
+        inds = self._indices
+        vals = self._values
+        entries = ", ".join(["{0}: {1}".format(inds[i], float(vals[i])) for i in range(len(inds))])
+        return "SparseVector({0}, {{{1}}})".format(self.size, entries)
+
+
+class Vectors(object):
+
+    @staticmethod
+    def dense(*elements) -> DenseVector:
+        """
+        Create a dense vector of 64-bit floats from a Python list or numbers.
+
+        Examples:
+        ::
+
+            >>> Vectors.dense([1, 2, 3])
+            DenseVector([1.0, 2.0, 3.0])
+            >>> Vectors.dense(1.0, 2.0)
+            DenseVector([1.0, 2.0])
+        """
+        if len(elements) == 1 and not isinstance(elements[0], (float, int)):
+            # it's list, numpy.array or other iterable object.
+            elements = elements[0]
+        return DenseVector(elements)
+
+    @staticmethod
+    def sparse(size: int, *args):
+        """
+        Create a sparse vector, using either a dict, a list of (index, value) pairs, or two separate
+        arrays of indices and values.
+
+        Examples:
+        ::
+
+            >>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
+            SparseVector(4, {1: 1.0, 3: 5.5})
+            >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+            SparseVector(4, {1: 1.0, 3: 5.5})
+            >>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
+            SparseVector(4, {1: 1.0, 3: 5.5})
+
+        :param size: The size of the vector.
+        :param args: Non-zero entries, as a dictionary, list of tuples,
+                     or two sorted lists containing indices and values.
+        """
+        return SparseVector(size, *args)
+
+
+class Matrix(ABC):
+    """
+    A matrix of double values.
+    """
+
+    @abstractmethod
+    def num_rows(self) -> int:
+        pass
+
+    @abstractmethod
+    def num_cols(self) -> int:
+        pass
+
+    @abstractmethod
+    def get(self, i: int, j: int) -> float:
+        pass
+
+    @abstractmethod
+    def to_array(self) -> np.ndarray:
+        """
+        Convert the matrix into an numpy.ndarray
+        """
+        pass
+
+
+class DenseMatrix(Matrix):
+    """
+    Column-major dense matrix. The entry values are stored in a single array of doubles with columns
+    listed in sequence.
+    """
+
+    def __init__(self, num_rows: int, num_cols: int, values):
+        assert len(values) == num_rows * num_cols
+        self._num_rows = num_rows
+        self._num_cols = num_cols
+        self._values = np.asarray(values, dtype=np.float64)
+
+    def num_rows(self) -> int:
+        return self._num_rows
+
+    def num_cols(self) -> int:
+        return self._num_cols
+
+    def get(self, i: int, j: int) -> float:
+        if i < 0 or i >= self._num_rows:
+            raise IndexError("Row index %d is out of range [0, %d)" % (i, self._num_rows))
+        if j >= self._num_cols or j < 0:
+            raise IndexError("Column index %d is out of range [0, %d)" % (j, self._num_cols))
+        return self._values[self._num_rows * j + i]
+
+    def to_array(self) -> np.ndarray:
+        """
+        Return a numpy.ndarray
+
+        Examples:
+        ::
+
+            >>> m = DenseMatrix(2, 2, range(4))
+            >>> m.to_array()
+            array([[ 0., 2.],
+                   [ 1., 3.]])
+        """
+        return self._values.reshape((self._num_rows, self._num_cols), order="F")
+
+    def __getitem__(self, indices):
+        i, j = indices
+        return self.get(i, j)
+
+    def __eq__(self, other):
+        if not isinstance(other, Matrix):
+            return False
+        if self._num_rows != other.num_rows() or self._num_cols != other.num_cols():
+            return False
+
+        self_values = np.ravel(self.to_array(), order="F")
+        other_values = np.ravel(other.to_array(), order="F")
+        return np.all(self_values == other_values)
diff --git a/flink-ml-python/pyflink/ml/tests/test_linalg.py b/flink-ml-python/pyflink/ml/tests/test_linalg.py
new file mode 100644
index 0000000..2095b4a
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/tests/test_linalg.py
@@ -0,0 +1,79 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+import numpy as np
+import array
+
+from pyflink.ml.core.linalg import SparseVector, DenseVector
+
+
+class VectorTests(unittest.TestCase):
+    def test_dot(self):
+        sv = SparseVector(4, {1: 1, 3: 2})
+        dv = DenseVector(np.array([1.0, 2.0, 3.0, 4.0]))
+        lst = DenseVector([1, 2, 3, 4])
+        mat = np.array(
+            [[1.0, 2.0, 3.0, 4.0], [1.0, 2.0, 3.0, 4.0], [1.0, 2.0, 3.0, 4.0], [1.0, 2.0, 3.0, 4.0]]
+        )
+        arr = array.array("d", [0, 1, 2, 3])
+        self.assertEqual(10.0, sv.dot(dv))
+        self.assertTrue(np.array_equal(np.array([3.0, 6.0, 9.0, 12.0]), sv.dot(mat)))
+        self.assertEqual(30.0, dv.dot(dv))
+        self.assertTrue(np.array_equal(np.array([10.0, 20.0, 30.0, 40.0]), dv.dot(mat)))
+        self.assertEqual(30.0, lst.dot(dv))
+        self.assertTrue(np.array_equal(np.array([10.0, 20.0, 30.0, 40.0]), lst.dot(mat)))
+        self.assertEqual(7.0, sv.dot(arr))
+
+    def test_squared_distance(self):
+        def squared_distance(a, b):
+            if isinstance(a, (DenseVector, SparseVector)):
+                return a.squared_distance(b)
+            else:
+                return b.squared_distance(a)
+
+        sv = SparseVector(4, {1: 1, 3: 2})
+        dv = DenseVector(np.array([1.0, 2.0, 3.0, 4.0]))
+        lst = DenseVector([4, 3, 2, 1])
+        lst1 = [4, 3, 2, 1]
+        arr = array.array("d", [0, 2, 1, 3])
+        narr = np.array([0, 2, 1, 3])
+        self.assertEqual(15.0, squared_distance(sv, dv))
+        self.assertEqual(25.0, squared_distance(sv, lst))
+        self.assertEqual(20.0, squared_distance(dv, lst))
+        self.assertEqual(15.0, squared_distance(dv, sv))
+        self.assertEqual(25.0, squared_distance(lst, sv))
+        self.assertEqual(20.0, squared_distance(lst, dv))
+        self.assertEqual(0.0, squared_distance(sv, sv))
+        self.assertEqual(0.0, squared_distance(dv, dv))
+        self.assertEqual(0.0, squared_distance(lst, lst))
+        self.assertEqual(25.0, squared_distance(sv, lst1))
+        self.assertEqual(3.0, squared_distance(sv, arr))
+        self.assertEqual(3.0, squared_distance(sv, narr))
+
+    def test_eq(self):
+        v1 = DenseVector([0.0, 1.0, 0.0, 5.5])
+        v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+        v3 = DenseVector([0.0, 1.0, 0.0, 5.5])
+        v4 = SparseVector(6, [(1, 1.0), (3, 5.5)])
+        v5 = DenseVector([0.0, 1.0, 0.0, 2.5])
+        v6 = SparseVector(4, [(1, 1.0), (3, 2.5)])
+        self.assertEqual(v1, v2)
+        self.assertEqual(v1, v3)
+        self.assertFalse(v2 == v4)
+        self.assertFalse(v1 == v5)
+        self.assertFalse(v1 == v6)
diff --git a/flink-ml-python/setup.py b/flink-ml-python/setup.py
index 319ba55..bdd9da7 100644
--- a/flink-ml-python/setup.py
+++ b/flink-ml-python/setup.py
@@ -104,7 +104,7 @@ try:
         author_email='dev@flink.apache.org',
         python_requires='>=3.6',
         install_requires=['apache-flink==1.14.0', 'pandas>=1.0,<1.2.0', 'jsonpickle==2.0.0',
-                          'cloudpickle==1.2.2'],
+                          'cloudpickle==1.2.2', 'numpy>=1.14.3,<1.20'],
         tests_require=['pytest==4.4.1'],
         description='Apache Flink ML Python API',
         long_description=long_description,