You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ze...@apache.org on 2022/01/10 21:11:43 UTC
[spark] branch master updated: [SPARK-37852][PYTHON][INFRA] Enable flake's E741 rule in PySpark
This is an automated email from the ASF dual-hosted git repository.
zero323 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd24b48 [SPARK-37852][PYTHON][INFRA] Enable flake's E741 rule in PySpark
bd24b48 is described below
commit bd24b4884b804fc85a083f82b864823851d5980c
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jan 10 22:09:45 2022 +0100
[SPARK-37852][PYTHON][INFRA] Enable flake's E741 rule in PySpark
### What changes were proposed in this pull request?
This PR enables flake's [E741](https://www.flake8rules.com/rules/E741.html) rule in PySpark codebase to comply PEP 8 (https://www.python.org/dev/peps/pep-0008/#names-to-avoid)
### Why are the changes needed?
To comply PEP 8.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Existing test cases should cover.
Closes #35152 from HyukjinKwon/enable-E741.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: zero323 <ms...@gmail.com>
---
dev/tox.ini | 9 ++---
examples/src/main/python/sql/arrow.py | 4 +-
python/pyspark/ml/linalg/__init__.py | 24 ++++++------
python/pyspark/ml/tests/test_param.py | 16 ++++----
python/pyspark/mllib/linalg/__init__.py | 24 ++++++------
python/pyspark/mllib/tests/test_linalg.py | 4 +-
python/pyspark/pandas/frame.py | 26 +++++++------
python/pyspark/pandas/namespace.py | 4 +-
python/pyspark/pandas/plot/matplotlib.py | 4 +-
python/pyspark/pandas/series.py | 12 +++---
python/pyspark/rddsampler.py | 4 +-
.../pyspark/sql/tests/test_pandas_cogrouped_map.py | 20 +++++-----
python/pyspark/sql/tests/test_serde.py | 12 +++---
python/pyspark/tests/test_conf.py | 8 ++--
python/pyspark/tests/test_rdd.py | 8 ++--
python/pyspark/tests/test_shuffle.py | 44 +++++++++++-----------
16 files changed, 112 insertions(+), 111 deletions(-)
diff --git a/dev/tox.ini b/dev/tox.ini
index 4047383..df4dfce 100644
--- a/dev/tox.ini
+++ b/dev/tox.ini
@@ -22,15 +22,14 @@ ignore =
# 1. Type hints with def are treated as redefinition (e.g., functions.log).
# 2. Some are used for testing.
F811,
+ # There are too many instances to fix. Ignored for now.
+ W503,
+ W504,
# Below rules should be enabled in the future.
E731,
- E741,
- W503,
- W504,
per-file-ignores =
- # F405 is ignored as shared.py is auto-generated.
- # E501 can be removed after SPARK-37419.
+ # F405 and E501 are ignored as shared.py is auto-generated.
python/pyspark/ml/param/shared.py: F405 E501,
# Examples contain some unused variables.
examples/src/main/python/sql/datasource.py: F841,
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index d30082b..298830c 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -258,8 +258,8 @@ def cogrouped_apply_in_pandas_example(spark):
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
- def asof_join(l, r):
- return pd.merge_asof(l, r, on="time", by="id")
+ def asof_join(left, right):
+ return pd.merge_asof(left, right, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py
index f940132..b361925 100644
--- a/python/pyspark/ml/linalg/__init__.py
+++ b/python/pyspark/ml/linalg/__init__.py
@@ -65,20 +65,20 @@ except BaseException:
_have_scipy = False
-def _convert_to_vector(l):
- if isinstance(l, Vector):
- return l
- elif type(l) in (array.array, np.array, np.ndarray, list, tuple, range):
- return DenseVector(l)
- elif _have_scipy and scipy.sparse.issparse(l):
- assert l.shape[1] == 1, "Expected column vector"
+def _convert_to_vector(d):
+ if isinstance(d, Vector):
+ return d
+ elif type(d) in (array.array, np.array, np.ndarray, list, tuple, range):
+ return DenseVector(d)
+ elif _have_scipy and scipy.sparse.issparse(d):
+ assert d.shape[1] == 1, "Expected column vector"
# Make sure the converted csc_matrix has sorted indices.
- csc = l.tocsc()
+ csc = d.tocsc()
if not csc.has_sorted_indices:
csc.sort_indices()
- return SparseVector(l.shape[0], csc.indices, csc.data)
+ return SparseVector(d.shape[0], csc.indices, csc.data)
else:
- raise TypeError("Cannot convert type %s into Vector" % type(l))
+ raise TypeError("Cannot convert type %s into Vector" % type(d))
def _vector_size(v):
@@ -125,8 +125,8 @@ def _format_float(f, digits=4):
return s
-def _format_float_list(l):
- return [_format_float(x) for x in l]
+def _format_float_list(xs):
+ return [_format_float(x) for x in xs]
def _double_to_long_bits(value):
diff --git a/python/pyspark/ml/tests/test_param.py b/python/pyspark/ml/tests/test_param.py
index e1a4267..64ed2f6 100644
--- a/python/pyspark/ml/tests/test_param.py
+++ b/python/pyspark/ml/tests/test_param.py
@@ -68,19 +68,19 @@ class ParamTypeConversionTests(PySparkTestCase):
self.assertRaises(TypeError, lambda: ElementwiseProduct(scalingVec=["a", "b"]))
def test_list(self):
- l = [0, 1]
+ lst = [0, 1]
for lst_like in [
- l,
- np.array(l),
- DenseVector(l),
- SparseVector(len(l), range(len(l)), l),
- pyarray.array("l", l),
+ lst,
+ np.array(lst),
+ DenseVector(lst),
+ SparseVector(len(lst), range(len(lst)), lst),
+ pyarray.array("l", lst),
range(2),
- tuple(l),
+ tuple(lst),
]:
converted = TypeConverters.toList(lst_like)
self.assertEqual(type(converted), list)
- self.assertListEqual(converted, l)
+ self.assertListEqual(converted, lst)
def test_list_int(self):
for indices in [
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py
index b29b96e..7d1818d 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -68,20 +68,20 @@ except BaseException:
_have_scipy = False
-def _convert_to_vector(l):
- if isinstance(l, Vector):
- return l
- elif type(l) in (array.array, np.array, np.ndarray, list, tuple, range):
- return DenseVector(l)
- elif _have_scipy and scipy.sparse.issparse(l):
- assert l.shape[1] == 1, "Expected column vector"
+def _convert_to_vector(d):
+ if isinstance(d, Vector):
+ return d
+ elif type(d) in (array.array, np.array, np.ndarray, list, tuple, range):
+ return DenseVector(d)
+ elif _have_scipy and scipy.sparse.issparse(d):
+ assert d.shape[1] == 1, "Expected column vector"
# Make sure the converted csc_matrix has sorted indices.
- csc = l.tocsc()
+ csc = d.tocsc()
if not csc.has_sorted_indices:
csc.sort_indices()
- return SparseVector(l.shape[0], csc.indices, csc.data)
+ return SparseVector(d.shape[0], csc.indices, csc.data)
else:
- raise TypeError("Cannot convert type %s into Vector" % type(l))
+ raise TypeError("Cannot convert type %s into Vector" % type(d))
def _vector_size(v):
@@ -128,8 +128,8 @@ def _format_float(f, digits=4):
return s
-def _format_float_list(l):
- return [_format_float(x) for x in l]
+def _format_float_list(xs):
+ return [_format_float(x) for x in xs]
def _double_to_long_bits(value):
diff --git a/python/pyspark/mllib/tests/test_linalg.py b/python/pyspark/mllib/tests/test_linalg.py
index d60396b..d25d2f2 100644
--- a/python/pyspark/mllib/tests/test_linalg.py
+++ b/python/pyspark/mllib/tests/test_linalg.py
@@ -523,8 +523,8 @@ class SciPyTests(MLlibTestCase):
self.assertEqual(sv, _convert_to_vector(lil.tocsr()))
self.assertEqual(sv, _convert_to_vector(lil.todok()))
- def serialize(l):
- return ser.loads(ser.dumps(_convert_to_vector(l)))
+ def serialize(d):
+ return ser.loads(ser.dumps(_convert_to_vector(d)))
self.assertEqual(sv, serialize(lil))
self.assertEqual(sv, serialize(lil.tocsc()))
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 9f743db..0a11a0f 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -3770,7 +3770,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
for lvl in level
]
- if all(isinstance(l, int) for l in level_list):
+ if all(isinstance(lvl, int) for lvl in level_list):
int_level_list = cast(List[int], level_list)
for lev in int_level_list:
if lev >= self._internal.index_level:
@@ -3782,9 +3782,9 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
idx = int_level_list
elif all(is_name_like_tuple(lev) for lev in level_list):
idx = []
- for l in cast(List[Label], level_list):
+ for label in cast(List[Label], level_list):
try:
- i = self._internal.index_names.index(l)
+ i = self._internal.index_names.index(label)
idx.append(i)
except ValueError:
if multi_index:
@@ -5305,8 +5305,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
reduce(
lambda x, y: x & y,
[
- scol == SF.lit(l)
- for l, scol in zip(lbl, internal.index_spark_columns)
+ scol == SF.lit(part)
+ for part, scol in zip(lbl, internal.index_spark_columns)
],
)
for lbl in labels
@@ -7033,7 +7033,9 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
if level is None or (is_list_like(level) and len(level) == 0): # type: ignore[arg-type]
by = self._internal.index_spark_columns
elif is_list_like(level):
- by = [self._internal.index_spark_columns[l] for l in level] # type: ignore[union-attr]
+ by = [
+ self._internal.index_spark_columns[lvl] for lvl in level # type: ignore[union-attr]
+ ]
else:
by = [self._internal.index_spark_columns[level]] # type: ignore[index]
@@ -12463,12 +12465,12 @@ def _reduce_spark_multi(sdf: SparkDataFrame, aggs: List[Column]) -> Any:
"""
assert isinstance(sdf, SparkDataFrame)
sdf0 = sdf.agg(*aggs)
- l = cast(pd.DataFrame, sdf0.limit(2).toPandas())
- assert len(l) == 1, (sdf, l)
- row = l.iloc[0]
- l2 = list(row)
- assert len(l2) == len(aggs), (row, l2)
- return l2
+ lst = cast(pd.DataFrame, sdf0.limit(2).toPandas())
+ assert len(lst) == 1, (sdf, lst)
+ row = lst.iloc[0]
+ lst2 = list(row)
+ assert len(lst2) == len(aggs), (row, lst2)
+ return lst2
class CachedDataFrame(DataFrame):
diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py
index 34649c5..ae0018c 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -3399,8 +3399,8 @@ def merge_asof(
left_join_on_columns = [scol_for(left_table, label) for label in left_join_on_names]
right_join_on_columns = [scol_for(right_table, label) for label in right_join_on_names]
on = reduce(
- lambda l, r: l & r,
- [l == r for l, r in zip(left_join_on_columns, right_join_on_columns)],
+ lambda lft, rgt: lft & rgt,
+ [lft == rgt for lft, rgt in zip(left_join_on_columns, right_join_on_columns)],
)
else:
on = None
diff --git a/python/pyspark/pandas/plot/matplotlib.py b/python/pyspark/pandas/plot/matplotlib.py
index c175149..6f54206 100644
--- a/python/pyspark/pandas/plot/matplotlib.py
+++ b/python/pyspark/pandas/plot/matplotlib.py
@@ -296,8 +296,8 @@ class PandasOnSparkBoxPlot(PandasBoxPlot, BoxPlotBase):
self.maybe_color_bp(bp)
self._return_obj = ret
- labels = [l for l, _ in self.data.items()]
- labels = [pprint_thing(l) for l in labels]
+ labels = [lbl for lbl, _ in self.data.items()]
+ labels = [pprint_thing(lbl) for lbl in labels]
if not self.use_index:
labels = [pprint_thing(key) for key in range(len(labels))]
self._set_ticklabels(ax, labels)
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index eba7a74..d403d87 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -6429,12 +6429,12 @@ def unpack_scalar(sdf: SparkDataFrame) -> Any:
Takes a dataframe that is supposed to contain a single row with a single scalar value,
and returns this value.
"""
- l = cast(pd.DataFrame, sdf.limit(2).toPandas())
- assert len(l) == 1, (sdf, l)
- row = l.iloc[0]
- l2 = list(row)
- assert len(l2) == 1, (row, l2)
- return l2[0]
+ lst = cast(pd.DataFrame, sdf.limit(2).toPandas())
+ assert len(lst) == 1, (sdf, lst)
+ row = lst.iloc[0]
+ lst2 = list(row)
+ assert len(lst2) == 1, (row, lst2)
+ return lst2[0]
@overload
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 11da447..bf82818 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -41,10 +41,10 @@ class RDDSamplerBase:
# http://en.wikipedia.org/wiki/Poisson_distribution
if mean < 20.0:
# one exp and k+1 random calls
- l = math.exp(-mean)
+ lda = math.exp(-mean)
p = self._random.random()
k = 0
- while p > l:
+ while p > lda:
k += 1
p *= self._random.random()
else:
diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 218e7cc..58022fa 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -88,8 +88,8 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
right_gdf = self.spark.createDataFrame(right).groupby(col("id") % 2 == 0)
- def merge_pandas(l, r):
- return pd.merge(l[["k", "v"]], r[["k", "v2"]], on=["k"])
+ def merge_pandas(lft, rgt):
+ return pd.merge(lft[["k", "v"]], rgt[["k", "v2"]], on=["k"])
result = (
left_gdf.cogroup(right_gdf)
@@ -106,8 +106,8 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
left = self.data1
right = self.data2
- def merge_pandas(l, r):
- return pd.merge(l, r, on=["id", "k"])
+ def merge_pandas(lft, rgt):
+ return pd.merge(lft, rgt, on=["id", "k"])
result = (
left.groupby()
@@ -157,8 +157,8 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
self._test_with_key(self.data1, right, isLeft=False)
def test_with_key_complex(self):
- def left_assign_key(key, l, _):
- return l.assign(key=key[0])
+ def left_assign_key(key, lft, _):
+ return lft.assign(key=key[0])
result = (
self.data1.groupby(col("id") % 2 == 0)
@@ -231,8 +231,8 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
@staticmethod
def _test_with_key(left, right, isLeft):
- def right_assign_key(key, l, r):
- return l.assign(key=key[0]) if isLeft else r.assign(key=key[0])
+ def right_assign_key(key, lft, rgt):
+ return lft.assign(key=key[0]) if isLeft else rgt.assign(key=key[0])
result = (
left.groupby("id")
@@ -248,8 +248,8 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
@staticmethod
def _test_merge(left, right, output_schema="id long, k int, v int, v2 int"):
- def merge_pandas(l, r):
- return pd.merge(l, r, on=["id", "k"])
+ def merge_pandas(lft, rgt):
+ return pd.merge(lft, rgt, on=["id", "k"])
result = (
left.groupby("id")
diff --git a/python/pyspark/sql/tests/test_serde.py b/python/pyspark/sql/tests/test_serde.py
index 0c57400..4e9d347 100644
--- a/python/pyspark/sql/tests/test_serde.py
+++ b/python/pyspark/sql/tests/test_serde.py
@@ -28,17 +28,17 @@ from pyspark.testing.sqlutils import ReusedSQLTestCase, UTCOffsetTimezone
class SerdeTests(ReusedSQLTestCase):
def test_serialize_nested_array_and_map(self):
- d = [Row(l=[Row(a=1, b="s")], d={"key": Row(c=1.0, d="2")})]
+ d = [Row(lst=[Row(a=1, b="s")], d={"key": Row(c=1.0, d="2")})]
rdd = self.sc.parallelize(d)
df = self.spark.createDataFrame(rdd)
row = df.head()
- self.assertEqual(1, len(row.l))
- self.assertEqual(1, row.l[0].a)
+ self.assertEqual(1, len(row.lst))
+ self.assertEqual(1, row.lst[0].a)
self.assertEqual("2", row.d["key"].d)
- l = df.rdd.map(lambda x: x.l).first()
- self.assertEqual(1, len(l))
- self.assertEqual("s", l[0].b)
+ lst = df.rdd.map(lambda x: x.lst).first()
+ self.assertEqual(1, len(lst))
+ self.assertEqual("s", lst[0].b)
d = df.rdd.map(lambda x: x.d).first()
self.assertEqual(1, len(d))
diff --git a/python/pyspark/tests/test_conf.py b/python/pyspark/tests/test_conf.py
index 0ec7163..6a7c7a0 100644
--- a/python/pyspark/tests/test_conf.py
+++ b/python/pyspark/tests/test_conf.py
@@ -25,10 +25,10 @@ class ConfTests(unittest.TestCase):
memoryList = ["1T", "1G", "1M", "1024K"]
for memory in memoryList:
sc = SparkContext(conf=SparkConf().set("spark.python.worker.memory", memory))
- l = list(range(1024))
- random.shuffle(l)
- rdd = sc.parallelize(l, 4)
- self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect())
+ lst = list(range(1024))
+ random.shuffle(lst)
+ rdd = sc.parallelize(lst, 4)
+ self.assertEqual(sorted(lst), rdd.sortBy(lambda x: x).collect())
sc.stop()
diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py
index 890f50b..5790cae 100644
--- a/python/pyspark/tests/test_rdd.py
+++ b/python/pyspark/tests/test_rdd.py
@@ -615,11 +615,11 @@ class RDDTests(ReusedPySparkTestCase):
def test_repartition_no_skewed(self):
num_partitions = 20
a = self.sc.parallelize(range(int(1000)), 2)
- l = a.repartition(num_partitions).glom().map(len).collect()
- zeros = len([x for x in l if x == 0])
+ xs = a.repartition(num_partitions).glom().map(len).collect()
+ zeros = len([x for x in xs if x == 0])
self.assertTrue(zeros == 0)
- l = a.coalesce(num_partitions, True).glom().map(len).collect()
- zeros = len([x for x in l if x == 0])
+ xs = a.coalesce(num_partitions, True).glom().map(len).collect()
+ zeros = len([x for x in xs if x == 0])
self.assertTrue(zeros == 0)
def test_repartition_on_textfile(self):
diff --git a/python/pyspark/tests/test_shuffle.py b/python/pyspark/tests/test_shuffle.py
index a470ca2..cea29c7 100644
--- a/python/pyspark/tests/test_shuffle.py
+++ b/python/pyspark/tests/test_shuffle.py
@@ -26,8 +26,8 @@ from pyspark.shuffle import Aggregator, ExternalMerger, ExternalSorter
class MergerTests(unittest.TestCase):
def setUp(self):
self.N = 1 << 12
- self.l = [i for i in range(self.N)]
- self.data = list(zip(self.l, self.l))
+ self.lst = [i for i in range(self.N)]
+ self.data = list(zip(self.lst, self.lst))
self.agg = Aggregator(
lambda x: [x], lambda x, y: x.append(y) or x, lambda x, y: x.extend(y) or x
)
@@ -81,8 +81,8 @@ class MergerTests(unittest.TestCase):
self.assertEqual(list(range(k)), list(vs))
ser = CPickleSerializer()
- l = ser.loads(ser.dumps(list(gen_gs(50002, 30000))))
- for k, vs in l:
+ lst = ser.loads(ser.dumps(list(gen_gs(50002, 30000))))
+ for k, vs in lst:
self.assertEqual(k, len(vs))
self.assertEqual(list(range(k)), list(vs))
@@ -119,15 +119,15 @@ class MergerTests(unittest.TestCase):
class SorterTests(unittest.TestCase):
def test_in_memory_sort(self):
- l = list(range(1024))
- random.shuffle(l)
+ lst = list(range(1024))
+ random.shuffle(lst)
sorter = ExternalSorter(1024)
- self.assertEqual(sorted(l), list(sorter.sorted(l)))
- self.assertEqual(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
- self.assertEqual(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
+ self.assertEqual(sorted(lst), list(sorter.sorted(lst)))
+ self.assertEqual(sorted(lst, reverse=True), list(sorter.sorted(lst, reverse=True)))
+ self.assertEqual(sorted(lst, key=lambda x: -x), list(sorter.sorted(lst, key=lambda x: -x)))
self.assertEqual(
- sorted(l, key=lambda x: -x, reverse=True),
- list(sorter.sorted(l, key=lambda x: -x, reverse=True)),
+ sorted(lst, key=lambda x: -x, reverse=True),
+ list(sorter.sorted(lst, key=lambda x: -x, reverse=True)),
)
def test_external_sort(self):
@@ -135,31 +135,31 @@ class SorterTests(unittest.TestCase):
def _next_limit(self):
return self.memory_limit
- l = list(range(1024))
- random.shuffle(l)
+ lst = list(range(1024))
+ random.shuffle(lst)
sorter = CustomizedSorter(1)
- self.assertEqual(sorted(l), list(sorter.sorted(l)))
+ self.assertEqual(sorted(lst), list(sorter.sorted(lst)))
self.assertGreater(shuffle.DiskBytesSpilled, 0)
last = shuffle.DiskBytesSpilled
- self.assertEqual(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
+ self.assertEqual(sorted(lst, reverse=True), list(sorter.sorted(lst, reverse=True)))
self.assertGreater(shuffle.DiskBytesSpilled, last)
last = shuffle.DiskBytesSpilled
- self.assertEqual(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
+ self.assertEqual(sorted(lst, key=lambda x: -x), list(sorter.sorted(lst, key=lambda x: -x)))
self.assertGreater(shuffle.DiskBytesSpilled, last)
last = shuffle.DiskBytesSpilled
self.assertEqual(
- sorted(l, key=lambda x: -x, reverse=True),
- list(sorter.sorted(l, key=lambda x: -x, reverse=True)),
+ sorted(lst, key=lambda x: -x, reverse=True),
+ list(sorter.sorted(lst, key=lambda x: -x, reverse=True)),
)
self.assertGreater(shuffle.DiskBytesSpilled, last)
def test_external_sort_in_rdd(self):
conf = SparkConf().set("spark.python.worker.memory", "1m")
sc = SparkContext(conf=conf)
- l = list(range(10240))
- random.shuffle(l)
- rdd = sc.parallelize(l, 4)
- self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect())
+ lst = list(range(10240))
+ random.shuffle(lst)
+ rdd = sc.parallelize(lst, 4)
+ self.assertEqual(sorted(lst), rdd.sortBy(lambda x: x).collect())
sc.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org