You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/02/24 08:29:38 UTC
[flink-ml] branch master updated: [FLINK-26266][ml][python] Support Vector and Matrix in ML Python API
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 10cae7b [FLINK-26266][ml][python] Support Vector and Matrix in ML Python API
10cae7b is described below
commit 10cae7b7a656f68dc4ea9f5bdc32441c141e0bbf
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Tue Feb 22 18:00:44 2022 +0800
[FLINK-26266][ml][python] Support Vector and Matrix in ML Python API
This closes #64.
---
flink-ml-python/dev/dev-requirements.txt | 3 +-
flink-ml-python/pyflink/ml/core/linalg.py | 544 ++++++++++++++++++++++++
flink-ml-python/pyflink/ml/tests/test_linalg.py | 79 ++++
flink-ml-python/setup.py | 2 +-
4 files changed, 626 insertions(+), 2 deletions(-)
diff --git a/flink-ml-python/dev/dev-requirements.txt b/flink-ml-python/dev/dev-requirements.txt
index 0c32456..9c8d89e 100755
--- a/flink-ml-python/dev/dev-requirements.txt
+++ b/flink-ml-python/dev/dev-requirements.txt
@@ -20,4 +20,5 @@ jsonpickle==2.0.0
cloudpickle==1.2.2
pytest==4.4.1
flake8==4.0.1
-mypy==0.910
\ No newline at end of file
+mypy==0.910
+numpy>=1.14.3,<1.20
\ No newline at end of file
diff --git a/flink-ml-python/pyflink/ml/core/linalg.py b/flink-ml-python/pyflink/ml/core/linalg.py
new file mode 100644
index 0000000..cd9cacd
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/core/linalg.py
@@ -0,0 +1,544 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import array
+from abc import ABC, abstractmethod
+from typing import Union, Sized
+
+import numpy as np
+
+
+class Vector(ABC):
+ """
+ Abstract class for DenseVector and SparseVector.
+ """
+
+ @abstractmethod
+ def size(self) -> int:
+ """
+ Gets the size of the vector.
+ """
+ pass
+
+ @abstractmethod
+ def get(self, i: int):
+ """
+ Gets the value of the ith element.
+ """
+ pass
+
+ @abstractmethod
+ def to_array(self) -> np.ndarray:
+ """
+ Convert the vector into an numpy.ndarray
+ """
+ pass
+
+ @staticmethod
+ def _equals(v1_indices, v1_values, v2_indices, v2_values):
+ v1_size = len(v1_values)
+ v2_size = len(v2_values)
+ k1 = 0
+ k2 = 0
+ all_equal = True
+ while all_equal:
+ while k1 < v1_size and v1_values[k1] == 0:
+ k1 += 1
+ while k2 < v2_size and v2_values[k2] == 0:
+ k2 += 1
+
+ if k1 >= v1_size or k2 >= v2_size:
+ return k1 >= v1_size and k2 >= v2_size
+
+ all_equal = v1_indices[k1] == v2_indices[k2] and v1_values[k1] == v2_values[k2]
+ k1 += 1
+ k2 += 1
+ return all_equal
+
+ def __len__(self):
+ return self.size()
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return self.get(key)
+ raise TypeError("Invalid argument type")
+
+
+class DenseVector(Vector):
+ """
+ A dense vector represented by a value array. We use numpy array for storage and arithmetic
+ will be delegated to the underlying numpy array.
+
+ Examples:
+ ::
+
+ >>> v = Vectors.dense([1.0, 2.0])
+ >>> u = Vectors.dense([3.0, 4.0])
+ >>> v + u
+ DenseVector([4.0, 6.0])
+ >>> 2 - v
+ DenseVector([1.0, 0.0])
+ >>> v / 2
+ DenseVector([0.5, 1.0])
+ >>> v * u
+ DenseVector([3.0, 8.0])
+ >>> u / v
+ DenseVector([3.0, 2.0])
+ >>> u % 2
+ DenseVector([1.0, 0.0])
+ >>> -v
+ DenseVector([-1.0, -2.0])
+ """
+
+ def __init__(self, values):
+ if not isinstance(values, np.ndarray):
+ values = np.array(values, dtype=np.float64)
+ if values.dtype != np.float64:
+ values = values.astype(np.float64)
+ self._values = values
+
+ def size(self) -> int:
+ return len(self._values)
+
+ def get(self, i: int):
+ return self._values[i]
+
+ def to_array(self) -> np.ndarray:
+ return self._values
+
+ def dot(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+ """
+ Dot product of two Vectors.
+
+ Examples:
+ ::
+
+ >>> import array
+ >>> dense = DenseVector(array.array('d', [1., 2.]))
+ >>> dense.dot(dense)
+ 5.0
+ >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
+ 4.0
+ >>> dense.dot(range(1, 3))
+ 5.0
+ >>> dense.dot(np.array(range(1, 3)))
+ 5.0
+ >>> dense.dot(np.reshape([1., 2., 3., 4.], (2, 2), order='F'))
+ array([ 5., 11.])
+ """
+ if type(other) == np.ndarray:
+ return np.dot(self._values, other)
+ else:
+ assert len(self) == len(other), "dimension mismatch"
+ if isinstance(other, SparseVector):
+ return other.dot(self)
+ elif isinstance(other, Vector):
+ return np.dot(self._values, other.to_array())
+ else:
+ return np.dot(self._values, other)
+
+ def squared_distance(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+ """
+ Squared distance of two Vectors.
+
+ Examples:
+ ::
+
+ >>> import array
+ >>> dense1 = DenseVector(array.array('d', [1., 2.]))
+ >>> dense1.squared_distance(dense1)
+ 0.0
+ >>> dense2 = np.array((2., 1.))
+ >>> dense1.squared_distance(dense2)
+ 2.0
+ >>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
+ >>> dense1.squared_distance(sparse1)
+ 2.0
+ """
+ assert len(self) == len(other), "dimension mismatch"
+ if isinstance(other, SparseVector):
+ return other.squared_distance(self)
+
+ if isinstance(other, Vector):
+ other = other.to_array()
+ elif not isinstance(other, np.ndarray):
+ other = np.array(other)
+ diff = self._values - other
+ return np.dot(diff, diff)
+
+ @property
+ def values(self):
+ """
+ Returns the underlying numpy.ndarray
+ """
+ return self._values
+
+ def __getitem__(self, item):
+ return self._values[item]
+
+ def __len__(self):
+ return self.size()
+
+ def __eq__(self, other):
+ if isinstance(other, DenseVector):
+ return np.array_equal(self._values, other._values)
+ elif isinstance(other, SparseVector):
+ if self.size() != other.size():
+ return False
+ return Vector._equals(
+ list(range(len(self))), self._values, other._indices, other._values)
+ return False
+
+ def __str__(self):
+ return "[" + ",".join([str(v) for v in self._values]) + "]"
+
+ def __repr__(self):
+ return "DenseVector([%s])" % (", ".join(str(i) for i in self._values))
+
+ def _unary_op(op):
+ def _(self):
+ return DenseVector(getattr(self._values, op)())
+
+ return _
+
+ def _binary_op(op):
+ def _(self, other):
+ if isinstance(other, DenseVector):
+ other = other._values
+ return DenseVector(getattr(self._values, op)(other))
+
+ return _
+
+ # arithmetic functions
+ __add__ = _binary_op("__add__") # type: ignore
+ __sub__ = _binary_op("__sub__") # type: ignore
+ __mul__ = _binary_op("__mul__") # type: ignore
+ __truediv__ = _binary_op("__truediv__") # type: ignore
+ __mod__ = _binary_op("__mod__") # type: ignore
+ __radd__ = _binary_op("__radd__") # type: ignore
+ __rsub__ = _binary_op("__rsub__") # type: ignore
+ __rmul__ = _binary_op("__rmul__") # type: ignore
+ __rtruediv__ = _binary_op("__rtruediv__") # type: ignore
+ __rmod__ = _binary_op("__rmod__") # type: ignore
+ __neg__ = _unary_op("__neg__") # type: ignore
+
+
+class SparseVector(Vector):
+ """
+ A sparse vector, using either a dict, a list of (index, value) pairs, or two separate
+ arrays of indices and values.
+
+ Example:
+ ::
+
+ >>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ """
+
+ def __init__(self, size: int, *args):
+ self._size = size
+ assert 1 <= len(args) <= 2, "The number of arguments must be 2 or 3"
+ if len(args) == 1:
+ # a dict, a list of (index, value) pairs
+ pairs = args[0]
+ if isinstance(pairs, dict):
+ pairs = pairs.items()
+ pairs = sorted(pairs)
+ self._indices = np.array([p[0] for p in pairs], dtype=np.int32)
+ self._values = np.array([p[1] for p in pairs], dtype=np.float64)
+ else:
+ assert len(args[0]) == len(args[1]), "The length of indices and values should be same"
+ # two separate arrays of indices and values.
+ self._indices = np.array(args[0], dtype=np.int32)
+ self._values = np.array(args[1], dtype=np.float64)
+ for i in range(len(self._indices) - 1):
+ if self._indices[i] >= self._indices[i + 1]:
+ raise TypeError(
+ "Indices {0} and {1} are not strictly increasing".format(
+ self._indices[i], self._indices[i + 1]))
+
+ def size(self) -> int:
+ return self._size
+
+ def get(self, i: int):
+ return self._values[self._indices.searchsorted(i)]
+
+ def to_array(self) -> np.ndarray:
+ """
+ Returns a copy of this SparseVector as a 1-dimensional NumPy array.
+ """
+ arr = np.zeros((self._size,), dtype=np.float64)
+ arr[self._indices] = self._values
+ return arr
+
+ def dot(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+ """
+ Dot product of two Vectors.
+
+ Examples:
+ ::
+
+ >>> sparse = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> sparse.dot(sparse)
+ 25.0
+ >>> sparse.dot(array.array('d', [1., 2., 3., 4.]))
+ 22.0
+ >>> sparse2 = SparseVector(4, [2], [1.0])
+ >>> sparse.dot(sparse2)
+ 0.0
+ >>> sparse.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+ array([22., 22.])
+ """
+ if isinstance(other, np.ndarray):
+ if other.ndim not in (2, 1):
+ raise ValueError('Cannot call dot with %d-dimensional array' % other.ndim)
+ assert len(self) == other.shape[0], "dimension mismatch"
+ return np.dot(self._values, other[self._indices])
+
+ assert len(self) == len(other), "dimension mismatch"
+
+ if isinstance(other, DenseVector):
+ return np.dot(other.to_array()[self._indices], self._values)
+ elif isinstance(other, SparseVector):
+ self_cmind = np.in1d(self._indices, other._indices, assume_unique=True)
+ self_values = self._values[self_cmind]
+ if self_values.size == 0:
+ return np.float_(0.0)
+ else:
+ other_cmind = np.in1d(other._indices, self._indices, assume_unique=True)
+ return np.dot(self_values, other._values[other_cmind])
+ else:
+ if isinstance(other, (array.array, np.ndarray, list, tuple, range)):
+ return self.dot(DenseVector(other))
+ raise ValueError('Cannot call with the type %s' % (type(other)))
+
+ def squared_distance(self, other: Union[Vector, np.ndarray, Sized]) -> np.ndarray:
+ """
+ Squared distance of two Vectors.
+
+ Examples:
+ ::
+
+ >>> sparse = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> sparse.squared_distance(sparse)
+ 0.0
+ >>> sparse.squared_distance(array.array('d', [1., 2., 3., 4.]))
+ 11.0
+ >>> sparse.squared_distance(np.array([1., 2., 3., 4.]))
+ 11.0
+ >>> sparse2 = SparseVector(4, [2], [1.0])
+ >>> sparse.squared_distance(sparse2)
+ 26.0
+ >>> sparse2.squared_distance(sparse)
+ 26.0
+ """
+ assert len(self) == len(other), "dimension mismatch"
+
+ if isinstance(other, np.ndarray) or isinstance(other, DenseVector):
+ if isinstance(other, np.ndarray) and other.ndim != 1:
+ raise ValueError(
+ "Cannot call squared_distance with %d-dimensional array" % other.ndim)
+
+ if isinstance(other, DenseVector):
+ other = other.to_array()
+ sparse_ind = np.zeros(other.size, dtype=bool)
+ sparse_ind[self._indices] = True
+ dist = other[sparse_ind] - self._values
+ result = np.dot(dist, dist)
+
+ other_ind = other[~sparse_ind]
+ result += np.dot(other_ind, other_ind)
+ return result
+ elif isinstance(other, SparseVector):
+ i = 0
+ j = 0
+ result = 0.0
+ while i < len(self._indices) and j < len(other._indices):
+ if self._indices[i] == other._indices[j]:
+ diff = self._values[i] - other._values[j]
+ result += diff * diff
+ i += 1
+ j += 1
+ elif self._indices[i] < other._indices[j]:
+ result += self._values[i] * self._values[i]
+ i += 1
+ else:
+ result += other._values[j] * other._values[j]
+ j += 1
+ while i < len(self._indices):
+ result += self._values[i] * self._values[i]
+ i += 1
+ while j < len(other._indices):
+ result += other._values[j] * other._values[j]
+ j += 1
+ return np.float_(result)
+ else:
+ if isinstance(other, (array.array, np.ndarray, list, tuple, range)):
+ return self.squared_distance(DenseVector(other))
+ raise ValueError('Cannot call with the type %s' % (type(other)))
+
+ def __len__(self):
+ return self.size()
+
+ def __eq__(self, other):
+ if isinstance(other, SparseVector):
+ return (other.size() == self.size()
+ and np.array_equal(other._indices, self._indices)
+ and np.array_equal(other._values, self._values))
+ elif isinstance(other, DenseVector):
+ if self.size != len(other):
+ return False
+ return Vector._equals(
+ self._indices, self._values, list(range(len(other))), other.to_array())
+ return False
+
+ def __str__(self):
+ inds = "[" + ",".join([str(i) for i in self._indices]) + "]"
+ vals = "[" + ",".join([str(v) for v in self._values]) + "]"
+ return "(" + ",".join((str(self.size), inds, vals)) + ")"
+
+ def __repr__(self):
+ inds = self._indices
+ vals = self._values
+ entries = ", ".join(["{0}: {1}".format(inds[i], float(vals[i])) for i in range(len(inds))])
+ return "SparseVector({0}, {{{1}}})".format(self.size, entries)
+
+
+class Vectors(object):
+
+ @staticmethod
+ def dense(*elements) -> DenseVector:
+ """
+ Create a dense vector of 64-bit floats from a Python list or numbers.
+
+ Examples:
+ ::
+
+ >>> Vectors.dense([1, 2, 3])
+ DenseVector([1.0, 2.0, 3.0])
+ >>> Vectors.dense(1.0, 2.0)
+ DenseVector([1.0, 2.0])
+ """
+ if len(elements) == 1 and not isinstance(elements[0], (float, int)):
+ # it's list, numpy.array or other iterable object.
+ elements = elements[0]
+ return DenseVector(elements)
+
+ @staticmethod
+ def sparse(size: int, *args):
+ """
+ Create a sparse vector, using either a dict, a list of (index, value) pairs, or two separate
+ arrays of indices and values.
+
+ Examples:
+ ::
+
+ >>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
+ SparseVector(4, {1: 1.0, 3: 5.5})
+
+ :param size: The size of the vector.
+ :param args: Non-zero entries, as a dictionary, list of tuples,
+ or two sorted lists containing indices and values.
+ """
+ return SparseVector(size, *args)
+
+
+class Matrix(ABC):
+ """
+ A matrix of double values.
+ """
+
+ @abstractmethod
+ def num_rows(self) -> int:
+ pass
+
+ @abstractmethod
+ def num_cols(self) -> int:
+ pass
+
+ @abstractmethod
+ def get(self, i: int, j: int) -> float:
+ pass
+
+ @abstractmethod
+ def to_array(self) -> np.ndarray:
+ """
+ Convert the matrix into an numpy.ndarray
+ """
+ pass
+
+
+class DenseMatrix(Matrix):
+ """
+ Column-major dense matrix. The entry values are stored in a single array of doubles with columns
+ listed in sequence.
+ """
+
+ def __init__(self, num_rows: int, num_cols: int, values):
+ assert len(values) == num_rows * num_cols
+ self._num_rows = num_rows
+ self._num_cols = num_cols
+ self._values = np.asarray(values, dtype=np.float64)
+
+ def num_rows(self) -> int:
+ return self._num_rows
+
+ def num_cols(self) -> int:
+ return self._num_cols
+
+ def get(self, i: int, j: int) -> float:
+ if i < 0 or i >= self._num_rows:
+ raise IndexError("Row index %d is out of range [0, %d)" % (i, self._num_rows))
+ if j >= self._num_cols or j < 0:
+ raise IndexError("Column index %d is out of range [0, %d)" % (j, self._num_cols))
+ return self._values[self._num_rows * j + i]
+
+ def to_array(self) -> np.ndarray:
+ """
+ Return a numpy.ndarray
+
+ Examples:
+ ::
+
+ >>> m = DenseMatrix(2, 2, range(4))
+ >>> m.to_array()
+ array([[ 0., 2.],
+ [ 1., 3.]])
+ """
+ return self._values.reshape((self._num_rows, self._num_cols), order="F")
+
+ def __getitem__(self, indices):
+ i, j = indices
+ return self.get(i, j)
+
+ def __eq__(self, other):
+ if not isinstance(other, Matrix):
+ return False
+ if self._num_rows != other.num_rows() or self._num_cols != other.num_cols():
+ return False
+
+ self_values = np.ravel(self.to_array(), order="F")
+ other_values = np.ravel(other.to_array(), order="F")
+ return np.all(self_values == other_values)
diff --git a/flink-ml-python/pyflink/ml/tests/test_linalg.py b/flink-ml-python/pyflink/ml/tests/test_linalg.py
new file mode 100644
index 0000000..2095b4a
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/tests/test_linalg.py
@@ -0,0 +1,79 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import unittest
+import numpy as np
+import array
+
+from pyflink.ml.core.linalg import SparseVector, DenseVector
+
+
+class VectorTests(unittest.TestCase):
+ def test_dot(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = DenseVector(np.array([1.0, 2.0, 3.0, 4.0]))
+ lst = DenseVector([1, 2, 3, 4])
+ mat = np.array(
+ [[1.0, 2.0, 3.0, 4.0], [1.0, 2.0, 3.0, 4.0], [1.0, 2.0, 3.0, 4.0], [1.0, 2.0, 3.0, 4.0]]
+ )
+ arr = array.array("d", [0, 1, 2, 3])
+ self.assertEqual(10.0, sv.dot(dv))
+ self.assertTrue(np.array_equal(np.array([3.0, 6.0, 9.0, 12.0]), sv.dot(mat)))
+ self.assertEqual(30.0, dv.dot(dv))
+ self.assertTrue(np.array_equal(np.array([10.0, 20.0, 30.0, 40.0]), dv.dot(mat)))
+ self.assertEqual(30.0, lst.dot(dv))
+ self.assertTrue(np.array_equal(np.array([10.0, 20.0, 30.0, 40.0]), lst.dot(mat)))
+ self.assertEqual(7.0, sv.dot(arr))
+
+ def test_squared_distance(self):
+ def squared_distance(a, b):
+ if isinstance(a, (DenseVector, SparseVector)):
+ return a.squared_distance(b)
+ else:
+ return b.squared_distance(a)
+
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = DenseVector(np.array([1.0, 2.0, 3.0, 4.0]))
+ lst = DenseVector([4, 3, 2, 1])
+ lst1 = [4, 3, 2, 1]
+ arr = array.array("d", [0, 2, 1, 3])
+ narr = np.array([0, 2, 1, 3])
+ self.assertEqual(15.0, squared_distance(sv, dv))
+ self.assertEqual(25.0, squared_distance(sv, lst))
+ self.assertEqual(20.0, squared_distance(dv, lst))
+ self.assertEqual(15.0, squared_distance(dv, sv))
+ self.assertEqual(25.0, squared_distance(lst, sv))
+ self.assertEqual(20.0, squared_distance(lst, dv))
+ self.assertEqual(0.0, squared_distance(sv, sv))
+ self.assertEqual(0.0, squared_distance(dv, dv))
+ self.assertEqual(0.0, squared_distance(lst, lst))
+ self.assertEqual(25.0, squared_distance(sv, lst1))
+ self.assertEqual(3.0, squared_distance(sv, arr))
+ self.assertEqual(3.0, squared_distance(sv, narr))
+
+ def test_eq(self):
+ v1 = DenseVector([0.0, 1.0, 0.0, 5.5])
+ v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+ v3 = DenseVector([0.0, 1.0, 0.0, 5.5])
+ v4 = SparseVector(6, [(1, 1.0), (3, 5.5)])
+ v5 = DenseVector([0.0, 1.0, 0.0, 2.5])
+ v6 = SparseVector(4, [(1, 1.0), (3, 2.5)])
+ self.assertEqual(v1, v2)
+ self.assertEqual(v1, v3)
+ self.assertFalse(v2 == v4)
+ self.assertFalse(v1 == v5)
+ self.assertFalse(v1 == v6)
diff --git a/flink-ml-python/setup.py b/flink-ml-python/setup.py
index 319ba55..bdd9da7 100644
--- a/flink-ml-python/setup.py
+++ b/flink-ml-python/setup.py
@@ -104,7 +104,7 @@ try:
author_email='dev@flink.apache.org',
python_requires='>=3.6',
install_requires=['apache-flink==1.14.0', 'pandas>=1.0,<1.2.0', 'jsonpickle==2.0.0',
- 'cloudpickle==1.2.2'],
+ 'cloudpickle==1.2.2', 'numpy>=1.14.3,<1.20'],
tests_require=['pytest==4.4.1'],
description='Apache Flink ML Python API',
long_description=long_description,