You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/02 23:13:40 UTC

[GitHub] [beam] robertwb opened a new pull request #11901: Prototype schema-inferring Row constructor.

robertwb opened a new pull request #11901:
URL: https://github.com/apache/beam/pull/11901


   This allows easy construction of schemas with PCollections.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r434245470



##########
File path: sdks/python/apache_beam/typehints/opcodes.py
##########
@@ -120,7 +120,7 @@ def get_iter(state, unused_arg):
 
 def symmetric_binary_op(state, unused_arg):
   # TODO(robertwb): This may not be entirely correct...
-  b, a = state.stack.pop(), state.stack.pop()
+  b, a = Const.unwrap(state.stack.pop()), Const.unwrap(state.stack.pop())

Review comment:
       It looks like this is an unrelated change that got pulled in?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r434734784



##########
File path: sdks/python/apache_beam/transforms/sql.py
##########
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
             SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+    self.__dict__.update(kwargs)

Review comment:
       I'd rather users not be importing stuff from `typehints`. Is there a better place? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #11901:
URL: https://github.com/apache/beam/pull/11901


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11901:
URL: https://github.com/apache/beam/pull/11901#issuecomment-637855309


   @TheNeuralBit Not try to overwhelm you, but I figured you'd be interested in this one. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437749425



##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -82,8 +86,19 @@ def from_runner_api_parameter(schema, components, unused_context):
     return RowCoder(schema)
 
   @staticmethod
-  def from_type_hint(named_tuple_type, registry):
-    return RowCoder(named_tuple_to_schema(named_tuple_type))
+  def from_type_hint(type_hint, registry):
+    if isinstance(type_hint, row_type.RowTypeConstraint):
+      schema = schema_pb2.Schema(
+          fields=[
+              schema_pb2.Field(
+                  name=name,
+                  type=typing_to_runner_api(type))
+              for (name, type) in type_hint._fields
+          ],
+          id=str(uuid.uuid4()))

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437749540



##########
File path: sdks/python/apache_beam/transforms/sql.py
##########
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
             SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+    self.__dict__.update(kwargs)

Review comment:
       Moved to pvalue (imported to the top level), like TaggedOutput. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r434734464



##########
File path: sdks/python/apache_beam/typehints/opcodes.py
##########
@@ -120,7 +120,7 @@ def get_iter(state, unused_arg):
 
 def symmetric_binary_op(state, unused_arg):
   # TODO(robertwb): This may not be entirely correct...
-  b, a = state.stack.pop(), state.stack.pop()
+  b, a = Const.unwrap(state.stack.pop()), Const.unwrap(state.stack.pop())

Review comment:
       This was needed for my test. (It now correctly infers `x + 1` for ints `x`.) I thought it small enough to not merit a new PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r434246368



##########
File path: sdks/python/apache_beam/transforms/sql.py
##########
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
             SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+    self.__dict__.update(kwargs)

Review comment:
       This is definitely useful for SQL but I don't think it should be SQL-specific. What about putting it in `apache_beam/typehints/schemas.py`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437536115



##########
File path: sdks/python/apache_beam/transforms/sql.py
##########
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
             SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+    self.__dict__.update(kwargs)

Review comment:
       Mm nothing comes to mind. I suppose it could just be `apache_beam.Row` for now, and we can alias it if we add a schema package with other top-level schema stuff later.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -82,8 +86,19 @@ def from_runner_api_parameter(schema, components, unused_context):
     return RowCoder(schema)
 
   @staticmethod
-  def from_type_hint(named_tuple_type, registry):
-    return RowCoder(named_tuple_to_schema(named_tuple_type))
+  def from_type_hint(type_hint, registry):
+    if isinstance(type_hint, row_type.RowTypeConstraint):
+      schema = schema_pb2.Schema(
+          fields=[
+              schema_pb2.Field(
+                  name=name,
+                  type=typing_to_runner_api(type))
+              for (name, type) in type_hint._fields
+          ],
+          id=str(uuid.uuid4()))

Review comment:
       Could you move this inference to `typehints.schemas` alongside `named_tuple_to_schema`? I have a WIP PR for batching schema'd PCollections that are inputs to Dataframes and I should re-use this logic there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437080538



##########
File path: sdks/python/apache_beam/transforms/sql.py
##########
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
             SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+    self.__dict__.update(kwargs)

Review comment:
       Is there going to be any other top-level schema stuff that it would make sense for this to be a sibling to? Otherwise, perhaps I could just put at a top level, and one would use it as `apache_beam.Row`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11901: Prototype schema-inferring Row constructor.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r434772179



##########
File path: sdks/python/apache_beam/transforms/sql.py
##########
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
             SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+    self.__dict__.update(kwargs)

Review comment:
       Hm yeah that makes sense. Maybe we need a new file or package for user-facing schema tools?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org