You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/10/21 01:14:22 UTC

[spark] branch master updated: [SPARK-37033][PYTHON] Inline type hints for python/pyspark/resource/requests.py

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 857d9f9  [SPARK-37033][PYTHON] Inline type hints for python/pyspark/resource/requests.py
857d9f9 is described below

commit 857d9f93a889f35220011f2e4a45aaf747c1e894
Author: dchvn <dg...@viettel.com.vn>
AuthorDate: Thu Oct 21 10:13:32 2021 +0900

    [SPARK-37033][PYTHON] Inline type hints for python/pyspark/resource/requests.py
    
    ### What changes were proposed in this pull request?
    Inline type hints for python/pyspark/resource/requests.py
    
    ### Why are the changes needed?
    Currently, there is type hint stub files python/pyspark/streaming/context.pyi to show the expected types for functions, but we can also take advantage of static type checking within the functions by inlining the type hints.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    Closes #34321 from dchvn/SPARK-37033.
    
    Authored-by: dchvn <dg...@viettel.com.vn>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/resource/requests.py  | 111 ++++++++++++++++++++++++++---------
 python/pyspark/resource/requests.pyi |  83 --------------------------
 2 files changed, 83 insertions(+), 111 deletions(-)

diff --git a/python/pyspark/resource/requests.py b/python/pyspark/resource/requests.py
index 4deb22b..169ee1f 100644
--- a/python/pyspark/resource/requests.py
+++ b/python/pyspark/resource/requests.py
@@ -14,8 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from typing import overload, Optional, Dict
 
-from pyspark.util import _parse_memory
+from py4j.java_gateway import JavaObject, JVMView
+
+from pyspark.util import _parse_memory   # type: ignore[attr-defined]
 
 
 class ExecutorResourceRequest(object):
@@ -61,26 +64,33 @@ class ExecutorResourceRequest(object):
     -----
     This API is evolving.
     """
-    def __init__(self, resourceName, amount, discoveryScript="", vendor=""):
+
+    def __init__(
+        self,
+        resourceName: str,
+        amount: int,
+        discoveryScript: str = "",
+        vendor: str = "",
+    ):
         self._name = resourceName
         self._amount = amount
         self._discovery_script = discoveryScript
         self._vendor = vendor
 
     @property
-    def resourceName(self):
+    def resourceName(self) -> str:
         return self._name
 
     @property
-    def amount(self):
+    def amount(self) -> int:
         return self._amount
 
     @property
-    def discoveryScript(self):
+    def discoveryScript(self) -> str:
         return self._discovery_script
 
     @property
-    def vendor(self):
+    def vendor(self) -> str:
         return self._vendor
 
 
@@ -103,9 +113,25 @@ class ExecutorResourceRequests(object):
     _PYSPARK_MEM = "pyspark.memory"
     _OFFHEAP_MEM = "offHeap"
 
-    def __init__(self, _jvm=None, _requests=None):
+    @overload
+    def __init__(self, _jvm: JVMView):
+        ...
+
+    @overload
+    def __init__(
+        self,
+        _jvm: None = ...,
+        _requests: Optional[Dict[str, ExecutorResourceRequest]] = ...,
+    ):
+        ...
+
+    def __init__(
+        self,
+        _jvm: Optional[JVMView] = None,
+        _requests: Optional[Dict[str, ExecutorResourceRequest]] = None,
+    ):
         from pyspark import SparkContext
-        _jvm = _jvm or SparkContext._jvm
+        _jvm = _jvm or SparkContext._jvm  # type: ignore[attr-defined]
         if _jvm is not None:
             self._java_executor_resource_requests = \
                 _jvm.org.apache.spark.resource.ExecutorResourceRequests()
@@ -124,9 +150,9 @@ class ExecutorResourceRequests(object):
                                                                        v.discoveryScript, v.vendor)
         else:
             self._java_executor_resource_requests = None
-            self._executor_resources = {}
+            self._executor_resources: Dict[str, ExecutorResourceRequest] = {}
 
-    def memory(self, amount):
+    def memory(self, amount: str) -> "ExecutorResourceRequests":
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.memory(amount)
         else:
@@ -134,7 +160,7 @@ class ExecutorResourceRequests(object):
                                                                              _parse_memory(amount))
         return self
 
-    def memoryOverhead(self, amount):
+    def memoryOverhead(self, amount: str) -> "ExecutorResourceRequests":
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.memoryOverhead(amount)
         else:
@@ -142,7 +168,7 @@ class ExecutorResourceRequests(object):
                 ExecutorResourceRequest(self._OVERHEAD_MEM, _parse_memory(amount))
         return self
 
-    def pysparkMemory(self, amount):
+    def pysparkMemory(self, amount: str) -> "ExecutorResourceRequests":
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.pysparkMemory(amount)
         else:
@@ -150,7 +176,7 @@ class ExecutorResourceRequests(object):
                 ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount))
         return self
 
-    def offheapMemory(self, amount):
+    def offheapMemory(self, amount: str) -> "ExecutorResourceRequests":
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.offHeapMemory(amount)
         else:
@@ -158,14 +184,20 @@ class ExecutorResourceRequests(object):
                 ExecutorResourceRequest(self._OFFHEAP_MEM, _parse_memory(amount))
         return self
 
-    def cores(self, amount):
+    def cores(self, amount: int) -> "ExecutorResourceRequests":
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.cores(amount)
         else:
             self._executor_resources[self._CORES] = ExecutorResourceRequest(self._CORES, amount)
         return self
 
-    def resource(self, resourceName, amount, discoveryScript="", vendor=""):
+    def resource(
+        self,
+        resourceName: str,
+        amount: int,
+        discoveryScript: str = "",
+        vendor: str = "",
+    ) -> "ExecutorResourceRequests":
         if self._java_executor_resource_requests is not None:
             self._java_executor_resource_requests.resource(resourceName, amount, discoveryScript,
                                                            vendor)
@@ -175,7 +207,7 @@ class ExecutorResourceRequests(object):
         return self
 
     @property
-    def requests(self):
+    def requests(self) -> Dict[str, ExecutorResourceRequest]:
         if self._java_executor_resource_requests is not None:
             result = {}
             execRes = self._java_executor_resource_requests.requestsJMap()
@@ -210,16 +242,16 @@ class TaskResourceRequest(object):
     -----
     This API is evolving.
     """
-    def __init__(self, resourceName, amount):
+    def __init__(self, resourceName: str, amount: float):
         self._name = resourceName
         self._amount = float(amount)
 
     @property
-    def resourceName(self):
+    def resourceName(self) -> str:
         return self._name
 
     @property
-    def amount(self):
+    def amount(self) -> float:
         return self._amount
 
 
@@ -239,12 +271,30 @@ class TaskResourceRequests(object):
 
     _CPUS = "cpus"
 
-    def __init__(self, _jvm=None, _requests=None):
+    @overload
+    def __init__(self, _jvm: JVMView):
+        ...
+
+    @overload
+    def __init__(
+        self,
+        _jvm: None = ...,
+        _requests: Optional[Dict[str, TaskResourceRequest]] = ...,
+    ):
+        ...
+
+    def __init__(
+        self,
+        _jvm: Optional[JVMView] = None,
+        _requests: Optional[Dict[str, TaskResourceRequest]] = None,
+    ):
         from pyspark import SparkContext
-        _jvm = _jvm or SparkContext._jvm
+        _jvm = _jvm or SparkContext._jvm  # type: ignore[attr-defined]
         if _jvm is not None:
-            self._java_task_resource_requests = \
-                SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
+            self._java_task_resource_requests: Optional[JavaObject] = (
+                SparkContext._jvm.org  # type: ignore[attr-defined]
+                .apache.spark.resource.TaskResourceRequests()
+            )
             if _requests is not None:
                 for k, v in _requests.items():
                     if k == self._CPUS:
@@ -253,24 +303,29 @@ class TaskResourceRequests(object):
                         self._java_task_resource_requests.resource(v.resourceName, v.amount)
         else:
             self._java_task_resource_requests = None
-            self._task_resources = {}
+            self._task_resources: Dict[str, TaskResourceRequest] = {}
 
-    def cpus(self, amount):
+    def cpus(self, amount: int) -> "TaskResourceRequests":
         if self._java_task_resource_requests is not None:
             self._java_task_resource_requests.cpus(amount)
         else:
             self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
         return self
 
-    def resource(self, resourceName, amount):
+    def resource(
+        self,
+        resourceName: str,
+        amount: float
+    ) -> "TaskResourceRequests":
         if self._java_task_resource_requests is not None:
-            self._java_task_resource_requests.resource(resourceName, float(amount))
+            self._java_task_resource_requests.resource(
+                resourceName, float(amount))
         else:
             self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
         return self
 
     @property
-    def requests(self):
+    def requests(self) -> Dict[str, TaskResourceRequest]:
         if self._java_task_resource_requests is not None:
             result = {}
             taskRes = self._java_task_resource_requests.requestsJMap()
diff --git a/python/pyspark/resource/requests.pyi b/python/pyspark/resource/requests.pyi
deleted file mode 100644
index 6ba14d6..0000000
--- a/python/pyspark/resource/requests.pyi
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from typing import overload, Dict, Optional
-
-from py4j.java_gateway import JVMView  # type: ignore[import]
-
-class ExecutorResourceRequest:
-    def __init__(
-        self,
-        resourceName: str,
-        amount: int,
-        discoveryScript: str = ...,
-        vendor: str = ...,
-    ) -> None: ...
-    @property
-    def resourceName(self) -> str: ...
-    @property
-    def amount(self) -> int: ...
-    @property
-    def discoveryScript(self) -> str: ...
-    @property
-    def vendor(self) -> str: ...
-
-class ExecutorResourceRequests:
-    @overload
-    def __init__(self, _jvm: JVMView) -> None: ...
-    @overload
-    def __init__(
-        self,
-        _jvm: None = ...,
-        _requests: Optional[Dict[str, ExecutorResourceRequest]] = ...,
-    ) -> None: ...
-    def memory(self, amount: str) -> ExecutorResourceRequests: ...
-    def memoryOverhead(self, amount: str) -> ExecutorResourceRequests: ...
-    def pysparkMemory(self, amount: str) -> ExecutorResourceRequests: ...
-    def offheapMemory(self, amount: str) -> ExecutorResourceRequests: ...
-    def cores(self, amount: int) -> ExecutorResourceRequests: ...
-    def resource(
-        self,
-        resourceName: str,
-        amount: int,
-        discoveryScript: str = ...,
-        vendor: str = ...,
-    ) -> ExecutorResourceRequests: ...
-    @property
-    def requests(self) -> Dict[str, ExecutorResourceRequest]: ...
-
-class TaskResourceRequest:
-    def __init__(self, resourceName: str, amount: float) -> None: ...
-    @property
-    def resourceName(self) -> str: ...
-    @property
-    def amount(self) -> float: ...
-
-class TaskResourceRequests:
-    @overload
-    def __init__(self, _jvm: JVMView) -> None: ...
-    @overload
-    def __init__(
-        self,
-        _jvm: None = ...,
-        _requests: Optional[Dict[str, TaskResourceRequest]] = ...,
-    ) -> None: ...
-    def cpus(self, amount: int) -> TaskResourceRequests: ...
-    def resource(self, resourceName: str, amount: float) -> TaskResourceRequests: ...
-    @property
-    def requests(self) -> Dict[str, TaskResourceRequest]: ...

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org