You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/19 10:34:54 UTC

[spark] branch master updated: [SPARK-44096][PYTHOM][DOCS] Make examples copy-pastable by adding a newline in all modules

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fc7eeb39aa [SPARK-44096][PYTHOM][DOCS] Make examples copy-pastable by adding a newline in all modules
0fc7eeb39aa is described below

commit 0fc7eeb39aad5997912c8a3f82aea089a4985898
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jun 19 13:34:42 2023 +0300

    [SPARK-44096][PYTHOM][DOCS] Make examples copy-pastable by adding a newline in all modules
    
    ### What changes were proposed in this pull request?
    
    I found that there are many instances same as https://github.com/apache/spark/pull/41655. This PR aims to address all the examples in all components in PySpark.
    
    ### Why are the changes needed?
    
    See https://github.com/apache/spark/pull/41655.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it changes the documentation and makes the example copy-pastable, see also https://github.com/apache/spark/pull/41655.
    
    ### How was this patch tested?
    
    CI in this PR should validate them. This is logically the same as https://github.com/apache/spark/pull/41655. I will also build the documentation locally and test.
    
    Closes #41657 from HyukjinKwon/minor-newlines.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 python/pyspark/accumulators.py         |  4 ++++
 python/pyspark/context.py              |  4 ++++
 python/pyspark/ml/functions.py         | 21 +++++++--------------
 python/pyspark/ml/torch/distributor.py |  2 ++
 python/pyspark/mllib/clustering.py     |  2 ++
 python/pyspark/rdd.py                  |  9 +++++++++
 python/pyspark/sql/dataframe.py        |  4 ++++
 python/pyspark/sql/functions.py        |  4 ++++
 python/pyspark/sql/pandas/group_ops.py |  6 ++++++
 python/pyspark/sql/streaming/query.py  |  2 ++
 python/pyspark/sql/types.py            |  1 +
 python/pyspark/sql/udtf.py             |  1 +
 12 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index dc8520a844d..a95bd9debfc 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -88,12 +88,14 @@ class Accumulator(Generic[T]):
     >>> def f(x):
     ...     global a
     ...     a += x
+    ...
     >>> rdd.foreach(f)
     >>> a.value
     13
     >>> b = sc.accumulator(0)
     >>> def g(x):
     ...     b.add(x)
+    ...
     >>> rdd.foreach(g)
     >>> b.value
     6
@@ -106,6 +108,7 @@ class Accumulator(Generic[T]):
     >>> def h(x):
     ...     global a
     ...     a.value = 7
+    ...
     >>> rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL
     Traceback (most recent call last):
         ...
@@ -198,6 +201,7 @@ class AccumulatorParam(Generic[T]):
     >>> def g(x):
     ...     global va
     ...     va += [x] * 3
+    ...
     >>> rdd = sc.parallelize([1,2,3])
     >>> rdd.foreach(g)
     >>> va.value
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6f5094963be..51a4db67e8c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1802,6 +1802,7 @@ class SparkContext:
         >>> def f(x):
         ...     global acc
         ...     acc += 1
+        ...
         >>> rdd.foreach(f)
         >>> acc.value
         15
@@ -2140,6 +2141,7 @@ class SparkContext:
         >>> def map_func(x):
         ...     sleep(100)
         ...     raise RuntimeError("Task should have been cancelled")
+        ...
         >>> def start_job(x):
         ...     global result
         ...     try:
@@ -2148,9 +2150,11 @@ class SparkContext:
         ...     except Exception as e:
         ...         result = "Cancelled"
         ...     lock.release()
+        ...
         >>> def stop_job():
         ...     sleep(5)
         ...     sc.cancelJobGroup("job_to_cancel")
+        ...
         >>> suppress = lock.acquire()
         >>> suppress = InheritableThread(target=start_job, args=(10,)).start()
         >>> suppress = InheritableThread(target=stop_job).start()
diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
index bce4101df1e..89b05b692ea 100644
--- a/python/pyspark/ml/functions.py
+++ b/python/pyspark/ml/functions.py
@@ -512,11 +512,10 @@ def predict_batch_udf(
         ...         # outputs.shape = [batch_size]
         ...         return inputs * 2
         ...     return predict
-        >>>
+        ...
         >>> times_two_udf = predict_batch_udf(make_times_two_fn,
         ...                                   return_type=FloatType(),
         ...                                   batch_size=10)
-        >>>
         >>> df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
         >>> df.withColumn("x2", times_two_udf("0")).show(5)
         +---+---+
@@ -561,12 +560,11 @@ def predict_batch_udf(
         ...         # outputs.shape = [batch_size]
         ...         return np.sum(inputs, axis=1)
         ...     return predict
-        >>>
+        ...
         >>> sum_udf = predict_batch_udf(make_sum_fn,
         ...                             return_type=FloatType(),
         ...                             batch_size=10,
         ...                             input_tensor_shapes=[[4]])
-        >>>
         >>> df.withColumn("sum", sum_udf(array("a", "b", "c", "d"))).show(5)
         +----+----+----+----+----+
         |   a|   b|   c|   d| sum|
@@ -591,11 +589,10 @@ def predict_batch_udf(
         ...         # outputs.shape = [batch_size]
         ...         return x1 + x2 + x3 + x4
         ...     return predict
-        >>>
+        ...
         >>> sum_udf = predict_batch_udf(make_sum_fn,
         ...                             return_type=FloatType(),
         ...                             batch_size=10)
-        >>>
         >>> df.withColumn("sum", sum_udf("a", "b", "c", "d")).show(5)
         +----+----+----+----+----+
         |   a|   b|   c|   d| sum|
@@ -643,14 +640,13 @@ def predict_batch_udf(
         ...         # outputs.shape = [batch_size]
         ...         return np.sum(x1, axis=1) + np.sum(x2, axis=1)
         ...     return predict
-        >>>
+        ...
         >>> multi_sum_udf = predict_batch_udf(
         ...     make_multi_sum_fn,
         ...     return_type=FloatType(),
         ...     batch_size=5,
         ...     input_tensor_shapes=[[4], [3]],
         ... )
-        >>>
         >>> df.withColumn("sum", multi_sum_udf("t1", "t2")).show(5)
         +--------------------+------------------+-----+
         |                  t1|                t2|  sum|
@@ -676,7 +672,7 @@ def predict_batch_udf(
         ...             "sum2": np.sum(x2, axis=1)
         ...         }
         ...     return predict_columnar
-        >>>
+        ...
         >>> multi_sum_udf = predict_batch_udf(
         ...     make_multi_sum_fn,
         ...     return_type=StructType([
@@ -686,7 +682,6 @@ def predict_batch_udf(
         ...     batch_size=5,
         ...     input_tensor_shapes=[[4], [3]],
         ... )
-        >>>
         >>> df.withColumn("preds", multi_sum_udf("t1", "t2")).select("t1", "t2", "preds.*").show(5)
         +--------------------+------------------+----+----+
         |                  t1|                t2|sum1|sum2|
@@ -705,7 +700,7 @@ def predict_batch_udf(
         ...         # x2.shape = [batch_size, 3]
         ...         return [{'sum1': np.sum(x1[i]), 'sum2': np.sum(x2[i])} for i in range(len(x1))]
         ...     return predict_row
-        >>>
+        ...
         >>> multi_sum_udf = predict_batch_udf(
         ...     make_multi_sum_fn,
         ...     return_type=StructType([
@@ -715,7 +710,6 @@ def predict_batch_udf(
         ...     batch_size=5,
         ...     input_tensor_shapes=[[4], [3]],
         ... )
-        >>>
         >>> df.withColumn("sum", multi_sum_udf("t1", "t2")).select("t1", "t2", "sum.*").show(5)
         +--------------------+------------------+----+----+
         |                  t1|                t2|sum1|sum2|
@@ -736,7 +730,7 @@ def predict_batch_udf(
         ...         # x2.shape = [batch_size, 3]
         ...         return {"t1x2": x1 * 2, "t2x2": x2 * 2}
         ...     return predict
-        >>>
+        ...
         >>> multi_times_two_udf = predict_batch_udf(
         ...     make_multi_times_two_fn,
         ...     return_type=StructType([
@@ -746,7 +740,6 @@ def predict_batch_udf(
         ...     batch_size=5,
         ...     input_tensor_shapes=[[4], [3]],
         ... )
-        >>>
         >>> df.withColumn("x2", multi_times_two_udf("t1", "t2")).select("t1", "t2", "x2.*").show(5)
         +--------------------+------------------+--------------------+------------------+
         |                  t1|                t2|                t1x2|              t2x2|
diff --git a/python/pyspark/ml/torch/distributor.py b/python/pyspark/ml/torch/distributor.py
index 2ed70854cc6..d40fbc61766 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -319,6 +319,7 @@ class TorchDistributor(Distributor):
     ...     # ...
     ...     torch.destroy_process_group()
     ...     return model # or anything else
+    ...
     >>> distributor = TorchDistributor(
     ...     num_processes=2,
     ...     local_mode=True,
@@ -345,6 +346,7 @@ class TorchDistributor(Distributor):
     ...     trainer.fit()
     ...     # ...
     ...     return trainer
+    ...
     >>> distributor = TorchDistributor(
     ...     num_processes=num_proc,
     ...     local_mode=True,
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index bf8073c2a2e..89210a8e0a4 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -739,9 +739,11 @@ class PowerIterationClusteringModel(
     ...         theta = 2.0 * math.pi * i / n
     ...         points.append((r * math.cos(theta), r * math.sin(theta)))
     ...     return points
+    ...
     >>> def sim(x, y):
     ...     dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
     ...     return math.exp(-dist2 / 2.0)
+    ...
     >>> r1 = 1.0
     >>> n1 = 10
     >>> r2 = 4.0
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 37c06561f72..30d6c1556e7 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -829,6 +829,7 @@ class RDD(Generic[T_co]):
         --------
         >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
         >>> def f(iterator): yield sum(iterator)
+        ...
         >>> rdd.mapPartitions(f).collect()
         [3, 7]
         """
@@ -874,6 +875,7 @@ class RDD(Generic[T_co]):
         --------
         >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
         >>> def f(splitIndex, iterator): yield splitIndex
+        ...
         >>> rdd.mapPartitionsWithIndex(f).sum()
         6
         """
@@ -917,6 +919,7 @@ class RDD(Generic[T_co]):
         --------
         >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
         >>> def f(splitIndex, iterator): yield splitIndex
+        ...
         >>> rdd.mapPartitionsWithSplit(f).sum()
         6
         """
@@ -1748,6 +1751,7 @@ class RDD(Generic[T_co]):
         Examples
         --------
         >>> def f(x): print(x)
+        ...
         >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
         """
         f = fail_on_stopiteration(f)
@@ -1781,6 +1785,7 @@ class RDD(Generic[T_co]):
         >>> def f(iterator):
         ...     for x in iterator:
         ...          print(x)
+        ...
         >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
         """
 
@@ -4207,6 +4212,7 @@ class RDD(Generic[T_co]):
         --------
         >>> rdd = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
         >>> def f(x): return x
+        ...
         >>> rdd.flatMapValues(f).collect()
         [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
         """
@@ -4243,6 +4249,7 @@ class RDD(Generic[T_co]):
         --------
         >>> rdd = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
         >>> def f(x): return len(x)
+        ...
         >>> rdd.mapValues(f).collect()
         [('a', 3), ('b', 1)]
         """
@@ -5321,6 +5328,7 @@ class RDDBarrier(Generic[T]):
         --------
         >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
         >>> def f(iterator): yield sum(iterator)
+        ...
         >>> barrier = rdd.barrier()
         >>> barrier
         <pyspark.rdd.RDDBarrier ...>
@@ -5372,6 +5380,7 @@ class RDDBarrier(Generic[T]):
         --------
         >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
         >>> def f(splitIndex, iterator): yield splitIndex
+        ...
         >>> barrier = rdd.barrier()
         >>> barrier
         <pyspark.rdd.RDDBarrier ...>
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 884dc997792..046139f5952 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1418,6 +1418,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
         >>> def func(person):
         ...     print(person.name)
+        ...
         >>> df.foreach(func)
         """
         self.rdd.foreach(f)
@@ -1442,6 +1443,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         >>> def func(itr):
         ...     for person in itr:
         ...         print(person.name)
+        ...
         >>> df.foreachPartition(func)
         """
         self.rdd.foreachPartition(f)  # type: ignore[arg-type]
@@ -5334,8 +5336,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         >>> df = spark.createDataFrame([(1, 1.0), (2, 2.0)], ["int", "float"])
         >>> def cast_all_to_int(input_df):
         ...     return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])
+        ...
         >>> def sort_columns_asc(input_df):
         ...     return input_df.select(*sorted(input_df.columns))
+        ...
         >>> df.transform(cast_all_to_int).transform(sort_columns_asc).show()
         +-----+---+
         |float|int|
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index d970fc0bb1e..530349c39dd 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -11828,6 +11828,7 @@ def transform(
 
     >>> def alternate(x, i):
     ...     return when(i % 2 == 0, x).otherwise(-x)
+    ...
     >>> df.select(transform("values", alternate).alias("alternated")).show()
     +--------------+
     |    alternated|
@@ -11978,6 +11979,7 @@ def filter(
     ... )
     >>> def after_second_quarter(x):
     ...     return month(to_date(x)) > 6
+    ...
     >>> df.select(
     ...     filter("values", after_second_quarter).alias("after_second_quarter")
     ... ).show(truncate=False)
@@ -12044,6 +12046,7 @@ def aggregate(
     ...     count = acc.count + 1
     ...     sum = acc.sum + x
     ...     return struct(count.alias("count"), sum.alias("sum"))
+    ...
     >>> df.select(
     ...     aggregate(
     ...         "values",
@@ -12116,6 +12119,7 @@ def reduce(
     ...     count = acc.count + 1
     ...     sum = acc.sum + x
     ...     return struct(count.alias("count"), sum.alias("sum"))
+    ...
     >>> df.select(
     ...     reduce(
     ...         "values",
diff --git a/python/pyspark/sql/pandas/group_ops.py b/python/pyspark/sql/pandas/group_ops.py
index 857ecd2e582..56403482b9d 100644
--- a/python/pyspark/sql/pandas/group_ops.py
+++ b/python/pyspark/sql/pandas/group_ops.py
@@ -72,6 +72,7 @@ class PandasGroupedOpsMixin:
         ... def normalize(pdf):
         ...     v = pdf.v
         ...     return pdf.assign(v=(v - v.mean()) / v.std())
+        ...
         >>> df.groupby("id").apply(normalize).show()  # doctest: +SKIP
         +---+-------------------+
         | id|                  v|
@@ -154,6 +155,7 @@ class PandasGroupedOpsMixin:
         >>> def normalize(pdf):
         ...     v = pdf.v
         ...     return pdf.assign(v=(v - v.mean()) / v.std())
+        ...
         >>> df.groupby("id").applyInPandas(
         ...     normalize, schema="id long, v double").show()  # doctest: +SKIP
         +---+-------------------+
@@ -180,6 +182,7 @@ class PandasGroupedOpsMixin:
         ...     # key is a tuple of one numpy.int64, which is the value
         ...     # of 'id' for the current group
         ...     return pd.DataFrame([key + (pdf.v.mean(),)])
+        ...
         >>> df.groupby('id').applyInPandas(
         ...     mean_func, schema="id long, v double").show()  # doctest: +SKIP
         +---+---+
@@ -193,6 +196,7 @@ class PandasGroupedOpsMixin:
         ...     # key is a tuple of two numpy.int64s, which is the values
         ...     # of 'id' and 'ceil(df.v / 2)' for the current group
         ...     return pd.DataFrame([key + (pdf.v.sum(),)])
+        ...
         >>> df.groupby(df.id, ceil(df.v / 2)).applyInPandas(
         ...     sum_func, schema="id long, `ceil(v / 2)` long, v double").show()  # doctest: +SKIP
         +---+-----------+----+
@@ -433,6 +437,7 @@ class PandasCogroupedOps:
         ...     ("time", "id", "v2"))
         >>> def asof_join(l, r):
         ...     return pd.merge_asof(l, r, on="time", by="id")
+        ...
         >>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
         ...     asof_join, schema="time int, id int, v1 double, v2 string"
         ... ).show()  # doctest: +SKIP
@@ -456,6 +461,7 @@ class PandasCogroupedOps:
         ...         return pd.merge_asof(l, r, on="time", by="id")
         ...     else:
         ...         return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
+        ...
         >>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
         ...     asof_join, "time int, id int, v1 double, v2 string").show()  # doctest: +SKIP
         +--------+---+---+---+
diff --git a/python/pyspark/sql/streaming/query.py b/python/pyspark/sql/streaming/query.py
index ac7a1acfcaa..6d18c93f019 100644
--- a/python/pyspark/sql/streaming/query.py
+++ b/python/pyspark/sql/streaming/query.py
@@ -588,6 +588,7 @@ class StreamingQueryManager:
         ...
         ...     def onQueryTerminated(self, event):
         ...         pass
+        ...
         >>> test_listener = TestListener()
 
         Register streaming query listener
@@ -635,6 +636,7 @@ class StreamingQueryManager:
         ...
         ...     def onQueryTerminated(self, event):
         ...         pass
+        ...
         >>> test_listener = TestListener()
 
         Register streaming query listener
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 9a73ba54e25..acc3f407f9d 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1331,6 +1331,7 @@ def _parse_datatype_json_string(json_string: str) -> DataType:
     ...     scala_datatype = spark._jsparkSession.parseDataType(datatype.json())
     ...     python_datatype = _parse_datatype_json_string(scala_datatype.json())
     ...     assert datatype == python_datatype
+    ...
     >>> for cls in _all_atomic_types.values():
     ...     if cls is not VarcharType and cls is not CharType:
     ...         check_datatype(cls())
diff --git a/python/pyspark/sql/udtf.py b/python/pyspark/sql/udtf.py
index e1b0825c170..95093970596 100644
--- a/python/pyspark/sql/udtf.py
+++ b/python/pyspark/sql/udtf.py
@@ -193,6 +193,7 @@ class UDTFRegistration:
         ... class PlusOne:
         ...     def eval(self, x: int):
         ...         yield x, x + 1
+        ...
         >>> _ = spark.udtf.register(name="plus_one", f=PlusOne)
         >>> spark.sql("SELECT * FROM plus_one(1)").collect()
         [Row(c1=1, c2=2)]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org