You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2017/02/14 17:57:53 UTC
spark git commit: [SPARK-18541][PYTHON] Add metadata parameter to
pyspark.sql.Column.alias()
Repository: spark
Updated Branches:
refs/heads/master e0eeb0f89 -> 7b64f7aa0
[SPARK-18541][PYTHON] Add metadata parameter to pyspark.sql.Column.alias()
## What changes were proposed in this pull request?
Add a `metadata` keyword parameter to `pyspark.sql.Column.alias()` to allow users to mix-in metadata while manipulating `DataFrame`s in `pyspark`. Without this, I believe it was necessary to pass back through `SparkSession.createDataFrame` each time a user wanted to manipulate `StructField.metadata` in `pyspark`.
This pull request also improves consistency between the Scala and Python APIs (i.e. I did not add any functionality that was not already in the Scala API).
Discussed ahead of time on JIRA with marmbrus
## How was this patch tested?
Added unit tests (and doc tests). Ran the pertinent tests manually.
Author: Sheamus K. Parkes <sh...@milliman.com>
Closes #16094 from shea-parkes/pyspark-column-alias-metadata.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b64f7aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b64f7aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b64f7aa
Branch: refs/heads/master
Commit: 7b64f7aa03a49adca5fcafe6fff422823b587514
Parents: e0eeb0f
Author: Sheamus K. Parkes <sh...@milliman.com>
Authored: Tue Feb 14 09:57:43 2017 -0800
Committer: Holden Karau <ho...@us.ibm.com>
Committed: Tue Feb 14 09:57:43 2017 -0800
----------------------------------------------------------------------
python/pyspark/sql/column.py | 26 +++++++++++++++++++++++---
python/pyspark/sql/tests.py | 10 ++++++++++
2 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7b64f7aa/python/pyspark/sql/column.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 73c8672..0df187a 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -17,6 +17,7 @@
import sys
import warnings
+import json
if sys.version >= '3':
basestring = str
@@ -303,19 +304,38 @@ class Column(object):
isNotNull = _unary_op("isNotNull", "True if the current expression is not null.")
@since(1.3)
- def alias(self, *alias):
+ def alias(self, *alias, **kwargs):
"""
Returns this column aliased with a new name or names (in the case of expressions that
return more than one column, such as explode).
+ :param alias: strings of desired column names (collects all positional arguments passed)
+ :param metadata: a dict of information to be stored in ``metadata`` attribute of the
+ corresponding :class: `StructField` (optional, keyword only argument)
+
+ .. versionchanged:: 2.2
+ Added optional ``metadata`` argument.
+
>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]
+ >>> df.select(df.age.alias("age3", metadata={'max': 99})).schema['age3'].metadata['max']
+ 99
"""
+ metadata = kwargs.pop('metadata', None)
+ assert not kwargs, 'Unexpected kwargs where passed: %s' % kwargs
+
+ sc = SparkContext._active_spark_context
if len(alias) == 1:
- return Column(getattr(self._jc, "as")(alias[0]))
+ if metadata:
+ jmeta = sc._jvm.org.apache.spark.sql.types.Metadata.fromJson(
+ json.dumps(metadata))
+ return Column(getattr(self._jc, "as")(alias[0], jmeta))
+ else:
+ return Column(getattr(self._jc, "as")(alias[0]))
else:
- sc = SparkContext._active_spark_context
+ if metadata:
+ raise ValueError('metadata can only be provided for a single column')
return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))
name = copy_func(alias, sinceversion=2.0, doc=":func:`name` is an alias for :func:`alias`.")
http://git-wip-us.apache.org/repos/asf/spark/blob/7b64f7aa/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7372167..62e1a8c 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -266,6 +266,9 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(result[0][0], "a")
self.assertEqual(result[0][1], "b")
+ with self.assertRaises(ValueError):
+ data.select(explode(data.mapfield).alias("a", "b", metadata={'max': 99})).count()
+
def test_and_in_expression(self):
self.assertEqual(4, self.df.filter((self.df.key <= 10) & (self.df.value <= "2")).count())
self.assertRaises(ValueError, lambda: (self.df.key <= 10) and (self.df.value <= "2"))
@@ -895,6 +898,13 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(self.testData, df.select(df.key, df.value).collect())
self.assertEqual([Row(value='1')], df.where(df.key == 1).select(df.value).collect())
+ def test_column_alias_metadata(self):
+ df = self.df
+ df_with_meta = df.select(df.key.alias('pk', metadata={'label': 'Primary Key'}))
+ self.assertEqual(df_with_meta.schema['pk'].metadata['label'], 'Primary Key')
+ with self.assertRaises(AssertionError):
+ df.select(df.key.alias('pk', metdata={'label': 'Primary Key'}))
+
def test_freqItems(self):
vals = [Row(a=1, b=-2.0) if i % 2 == 0 else Row(a=i, b=i * 1.0) for i in range(100)]
df = self.sc.parallelize(vals).toDF()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org