You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/19 10:34:54 UTC
[spark] branch master updated: [SPARK-44096][PYTHOM][DOCS] Make examples copy-pastable by adding a newline in all modules
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0fc7eeb39aa [SPARK-44096][PYTHOM][DOCS] Make examples copy-pastable by adding a newline in all modules
0fc7eeb39aa is described below
commit 0fc7eeb39aad5997912c8a3f82aea089a4985898
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jun 19 13:34:42 2023 +0300
[SPARK-44096][PYTHOM][DOCS] Make examples copy-pastable by adding a newline in all modules
### What changes were proposed in this pull request?
I found that there are many instances same as https://github.com/apache/spark/pull/41655. This PR aims to address all the examples in all components in PySpark.
### Why are the changes needed?
See https://github.com/apache/spark/pull/41655.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the documentation and makes the example copy-pastable, see also https://github.com/apache/spark/pull/41655.
### How was this patch tested?
CI in this PR should validate them. This is logically the same as https://github.com/apache/spark/pull/41655. I will also build the documentation locally and test.
Closes #41657 from HyukjinKwon/minor-newlines.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
python/pyspark/accumulators.py | 4 ++++
python/pyspark/context.py | 4 ++++
python/pyspark/ml/functions.py | 21 +++++++--------------
python/pyspark/ml/torch/distributor.py | 2 ++
python/pyspark/mllib/clustering.py | 2 ++
python/pyspark/rdd.py | 9 +++++++++
python/pyspark/sql/dataframe.py | 4 ++++
python/pyspark/sql/functions.py | 4 ++++
python/pyspark/sql/pandas/group_ops.py | 6 ++++++
python/pyspark/sql/streaming/query.py | 2 ++
python/pyspark/sql/types.py | 1 +
python/pyspark/sql/udtf.py | 1 +
12 files changed, 46 insertions(+), 14 deletions(-)
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index dc8520a844d..a95bd9debfc 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -88,12 +88,14 @@ class Accumulator(Generic[T]):
>>> def f(x):
... global a
... a += x
+ ...
>>> rdd.foreach(f)
>>> a.value
13
>>> b = sc.accumulator(0)
>>> def g(x):
... b.add(x)
+ ...
>>> rdd.foreach(g)
>>> b.value
6
@@ -106,6 +108,7 @@ class Accumulator(Generic[T]):
>>> def h(x):
... global a
... a.value = 7
+ ...
>>> rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
@@ -198,6 +201,7 @@ class AccumulatorParam(Generic[T]):
>>> def g(x):
... global va
... va += [x] * 3
+ ...
>>> rdd = sc.parallelize([1,2,3])
>>> rdd.foreach(g)
>>> va.value
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6f5094963be..51a4db67e8c 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1802,6 +1802,7 @@ class SparkContext:
>>> def f(x):
... global acc
... acc += 1
+ ...
>>> rdd.foreach(f)
>>> acc.value
15
@@ -2140,6 +2141,7 @@ class SparkContext:
>>> def map_func(x):
... sleep(100)
... raise RuntimeError("Task should have been cancelled")
+ ...
>>> def start_job(x):
... global result
... try:
@@ -2148,9 +2150,11 @@ class SparkContext:
... except Exception as e:
... result = "Cancelled"
... lock.release()
+ ...
>>> def stop_job():
... sleep(5)
... sc.cancelJobGroup("job_to_cancel")
+ ...
>>> suppress = lock.acquire()
>>> suppress = InheritableThread(target=start_job, args=(10,)).start()
>>> suppress = InheritableThread(target=stop_job).start()
diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
index bce4101df1e..89b05b692ea 100644
--- a/python/pyspark/ml/functions.py
+++ b/python/pyspark/ml/functions.py
@@ -512,11 +512,10 @@ def predict_batch_udf(
... # outputs.shape = [batch_size]
... return inputs * 2
... return predict
- >>>
+ ...
>>> times_two_udf = predict_batch_udf(make_times_two_fn,
... return_type=FloatType(),
... batch_size=10)
- >>>
>>> df = spark.createDataFrame(pd.DataFrame(np.arange(100)))
>>> df.withColumn("x2", times_two_udf("0")).show(5)
+---+---+
@@ -561,12 +560,11 @@ def predict_batch_udf(
... # outputs.shape = [batch_size]
... return np.sum(inputs, axis=1)
... return predict
- >>>
+ ...
>>> sum_udf = predict_batch_udf(make_sum_fn,
... return_type=FloatType(),
... batch_size=10,
... input_tensor_shapes=[[4]])
- >>>
>>> df.withColumn("sum", sum_udf(array("a", "b", "c", "d"))).show(5)
+----+----+----+----+----+
| a| b| c| d| sum|
@@ -591,11 +589,10 @@ def predict_batch_udf(
... # outputs.shape = [batch_size]
... return x1 + x2 + x3 + x4
... return predict
- >>>
+ ...
>>> sum_udf = predict_batch_udf(make_sum_fn,
... return_type=FloatType(),
... batch_size=10)
- >>>
>>> df.withColumn("sum", sum_udf("a", "b", "c", "d")).show(5)
+----+----+----+----+----+
| a| b| c| d| sum|
@@ -643,14 +640,13 @@ def predict_batch_udf(
... # outputs.shape = [batch_size]
... return np.sum(x1, axis=1) + np.sum(x2, axis=1)
... return predict
- >>>
+ ...
>>> multi_sum_udf = predict_batch_udf(
... make_multi_sum_fn,
... return_type=FloatType(),
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
- >>>
>>> df.withColumn("sum", multi_sum_udf("t1", "t2")).show(5)
+--------------------+------------------+-----+
| t1| t2| sum|
@@ -676,7 +672,7 @@ def predict_batch_udf(
... "sum2": np.sum(x2, axis=1)
... }
... return predict_columnar
- >>>
+ ...
>>> multi_sum_udf = predict_batch_udf(
... make_multi_sum_fn,
... return_type=StructType([
@@ -686,7 +682,6 @@ def predict_batch_udf(
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
- >>>
>>> df.withColumn("preds", multi_sum_udf("t1", "t2")).select("t1", "t2", "preds.*").show(5)
+--------------------+------------------+----+----+
| t1| t2|sum1|sum2|
@@ -705,7 +700,7 @@ def predict_batch_udf(
... # x2.shape = [batch_size, 3]
... return [{'sum1': np.sum(x1[i]), 'sum2': np.sum(x2[i])} for i in range(len(x1))]
... return predict_row
- >>>
+ ...
>>> multi_sum_udf = predict_batch_udf(
... make_multi_sum_fn,
... return_type=StructType([
@@ -715,7 +710,6 @@ def predict_batch_udf(
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
- >>>
>>> df.withColumn("sum", multi_sum_udf("t1", "t2")).select("t1", "t2", "sum.*").show(5)
+--------------------+------------------+----+----+
| t1| t2|sum1|sum2|
@@ -736,7 +730,7 @@ def predict_batch_udf(
... # x2.shape = [batch_size, 3]
... return {"t1x2": x1 * 2, "t2x2": x2 * 2}
... return predict
- >>>
+ ...
>>> multi_times_two_udf = predict_batch_udf(
... make_multi_times_two_fn,
... return_type=StructType([
@@ -746,7 +740,6 @@ def predict_batch_udf(
... batch_size=5,
... input_tensor_shapes=[[4], [3]],
... )
- >>>
>>> df.withColumn("x2", multi_times_two_udf("t1", "t2")).select("t1", "t2", "x2.*").show(5)
+--------------------+------------------+--------------------+------------------+
| t1| t2| t1x2| t2x2|
diff --git a/python/pyspark/ml/torch/distributor.py b/python/pyspark/ml/torch/distributor.py
index 2ed70854cc6..d40fbc61766 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -319,6 +319,7 @@ class TorchDistributor(Distributor):
... # ...
... torch.destroy_process_group()
... return model # or anything else
+ ...
>>> distributor = TorchDistributor(
... num_processes=2,
... local_mode=True,
@@ -345,6 +346,7 @@ class TorchDistributor(Distributor):
... trainer.fit()
... # ...
... return trainer
+ ...
>>> distributor = TorchDistributor(
... num_processes=num_proc,
... local_mode=True,
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index bf8073c2a2e..89210a8e0a4 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -739,9 +739,11 @@ class PowerIterationClusteringModel(
... theta = 2.0 * math.pi * i / n
... points.append((r * math.cos(theta), r * math.sin(theta)))
... return points
+ ...
>>> def sim(x, y):
... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
... return math.exp(-dist2 / 2.0)
+ ...
>>> r1 = 1.0
>>> n1 = 10
>>> r2 = 4.0
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 37c06561f72..30d6c1556e7 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -829,6 +829,7 @@ class RDD(Generic[T_co]):
--------
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
+ ...
>>> rdd.mapPartitions(f).collect()
[3, 7]
"""
@@ -874,6 +875,7 @@ class RDD(Generic[T_co]):
--------
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): yield splitIndex
+ ...
>>> rdd.mapPartitionsWithIndex(f).sum()
6
"""
@@ -917,6 +919,7 @@ class RDD(Generic[T_co]):
--------
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): yield splitIndex
+ ...
>>> rdd.mapPartitionsWithSplit(f).sum()
6
"""
@@ -1748,6 +1751,7 @@ class RDD(Generic[T_co]):
Examples
--------
>>> def f(x): print(x)
+ ...
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
"""
f = fail_on_stopiteration(f)
@@ -1781,6 +1785,7 @@ class RDD(Generic[T_co]):
>>> def f(iterator):
... for x in iterator:
... print(x)
+ ...
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
"""
@@ -4207,6 +4212,7 @@ class RDD(Generic[T_co]):
--------
>>> rdd = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
>>> def f(x): return x
+ ...
>>> rdd.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
"""
@@ -4243,6 +4249,7 @@ class RDD(Generic[T_co]):
--------
>>> rdd = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
>>> def f(x): return len(x)
+ ...
>>> rdd.mapValues(f).collect()
[('a', 3), ('b', 1)]
"""
@@ -5321,6 +5328,7 @@ class RDDBarrier(Generic[T]):
--------
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
+ ...
>>> barrier = rdd.barrier()
>>> barrier
<pyspark.rdd.RDDBarrier ...>
@@ -5372,6 +5380,7 @@ class RDDBarrier(Generic[T]):
--------
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): yield splitIndex
+ ...
>>> barrier = rdd.barrier()
>>> barrier
<pyspark.rdd.RDDBarrier ...>
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 884dc997792..046139f5952 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1418,6 +1418,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
>>> def func(person):
... print(person.name)
+ ...
>>> df.foreach(func)
"""
self.rdd.foreach(f)
@@ -1442,6 +1443,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
>>> def func(itr):
... for person in itr:
... print(person.name)
+ ...
>>> df.foreachPartition(func)
"""
self.rdd.foreachPartition(f) # type: ignore[arg-type]
@@ -5334,8 +5336,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
>>> df = spark.createDataFrame([(1, 1.0), (2, 2.0)], ["int", "float"])
>>> def cast_all_to_int(input_df):
... return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])
+ ...
>>> def sort_columns_asc(input_df):
... return input_df.select(*sorted(input_df.columns))
+ ...
>>> df.transform(cast_all_to_int).transform(sort_columns_asc).show()
+-----+---+
|float|int|
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index d970fc0bb1e..530349c39dd 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -11828,6 +11828,7 @@ def transform(
>>> def alternate(x, i):
... return when(i % 2 == 0, x).otherwise(-x)
+ ...
>>> df.select(transform("values", alternate).alias("alternated")).show()
+--------------+
| alternated|
@@ -11978,6 +11979,7 @@ def filter(
... )
>>> def after_second_quarter(x):
... return month(to_date(x)) > 6
+ ...
>>> df.select(
... filter("values", after_second_quarter).alias("after_second_quarter")
... ).show(truncate=False)
@@ -12044,6 +12046,7 @@ def aggregate(
... count = acc.count + 1
... sum = acc.sum + x
... return struct(count.alias("count"), sum.alias("sum"))
+ ...
>>> df.select(
... aggregate(
... "values",
@@ -12116,6 +12119,7 @@ def reduce(
... count = acc.count + 1
... sum = acc.sum + x
... return struct(count.alias("count"), sum.alias("sum"))
+ ...
>>> df.select(
... reduce(
... "values",
diff --git a/python/pyspark/sql/pandas/group_ops.py b/python/pyspark/sql/pandas/group_ops.py
index 857ecd2e582..56403482b9d 100644
--- a/python/pyspark/sql/pandas/group_ops.py
+++ b/python/pyspark/sql/pandas/group_ops.py
@@ -72,6 +72,7 @@ class PandasGroupedOpsMixin:
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
+ ...
>>> df.groupby("id").apply(normalize).show() # doctest: +SKIP
+---+-------------------+
| id| v|
@@ -154,6 +155,7 @@ class PandasGroupedOpsMixin:
>>> def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
+ ...
>>> df.groupby("id").applyInPandas(
... normalize, schema="id long, v double").show() # doctest: +SKIP
+---+-------------------+
@@ -180,6 +182,7 @@ class PandasGroupedOpsMixin:
... # key is a tuple of one numpy.int64, which is the value
... # of 'id' for the current group
... return pd.DataFrame([key + (pdf.v.mean(),)])
+ ...
>>> df.groupby('id').applyInPandas(
... mean_func, schema="id long, v double").show() # doctest: +SKIP
+---+---+
@@ -193,6 +196,7 @@ class PandasGroupedOpsMixin:
... # key is a tuple of two numpy.int64s, which is the values
... # of 'id' and 'ceil(df.v / 2)' for the current group
... return pd.DataFrame([key + (pdf.v.sum(),)])
+ ...
>>> df.groupby(df.id, ceil(df.v / 2)).applyInPandas(
... sum_func, schema="id long, `ceil(v / 2)` long, v double").show() # doctest: +SKIP
+---+-----------+----+
@@ -433,6 +437,7 @@ class PandasCogroupedOps:
... ("time", "id", "v2"))
>>> def asof_join(l, r):
... return pd.merge_asof(l, r, on="time", by="id")
+ ...
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
... asof_join, schema="time int, id int, v1 double, v2 string"
... ).show() # doctest: +SKIP
@@ -456,6 +461,7 @@ class PandasCogroupedOps:
... return pd.merge_asof(l, r, on="time", by="id")
... else:
... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
+ ...
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
... asof_join, "time int, id int, v1 double, v2 string").show() # doctest: +SKIP
+--------+---+---+---+
diff --git a/python/pyspark/sql/streaming/query.py b/python/pyspark/sql/streaming/query.py
index ac7a1acfcaa..6d18c93f019 100644
--- a/python/pyspark/sql/streaming/query.py
+++ b/python/pyspark/sql/streaming/query.py
@@ -588,6 +588,7 @@ class StreamingQueryManager:
...
... def onQueryTerminated(self, event):
... pass
+ ...
>>> test_listener = TestListener()
Register streaming query listener
@@ -635,6 +636,7 @@ class StreamingQueryManager:
...
... def onQueryTerminated(self, event):
... pass
+ ...
>>> test_listener = TestListener()
Register streaming query listener
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 9a73ba54e25..acc3f407f9d 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1331,6 +1331,7 @@ def _parse_datatype_json_string(json_string: str) -> DataType:
... scala_datatype = spark._jsparkSession.parseDataType(datatype.json())
... python_datatype = _parse_datatype_json_string(scala_datatype.json())
... assert datatype == python_datatype
+ ...
>>> for cls in _all_atomic_types.values():
... if cls is not VarcharType and cls is not CharType:
... check_datatype(cls())
diff --git a/python/pyspark/sql/udtf.py b/python/pyspark/sql/udtf.py
index e1b0825c170..95093970596 100644
--- a/python/pyspark/sql/udtf.py
+++ b/python/pyspark/sql/udtf.py
@@ -193,6 +193,7 @@ class UDTFRegistration:
... class PlusOne:
... def eval(self, x: int):
... yield x, x + 1
+ ...
>>> _ = spark.udtf.register(name="plus_one", f=PlusOne)
>>> spark.sql("SELECT * FROM plus_one(1)").collect()
[Row(c1=1, c2=2)]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org