You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ka...@apache.org on 2020/03/03 13:36:17 UTC

[beam] 01/01: Revert "[BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. (#10915)"

This is an automated email from the ASF dual-hosted git repository.

kamilwu pushed a commit to branch revert-10915-todataframe
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d5e23e2183b64ef8ebcceba36788c45c827ff9f5
Author: Kamil Wasilewski <ka...@poczta.fm>
AuthorDate: Tue Mar 3 14:36:07 2020 +0100

    Revert "[BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. (#10915)"
    
    This reverts commit a29fdff8c089cb5734019f373d0825ddd515befb.
---
 .../apache_beam/runners/interactive/utils.py       | 52 --------------
 .../apache_beam/runners/interactive/utils_test.py  | 84 ----------------------
 2 files changed, 136 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
deleted file mode 100644
index cc52bf3..0000000
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Utilities to be used in  Interactive Beam.
-"""
-
-from __future__ import absolute_import
-
-import pandas as pd
-
-from apache_beam.utils.windowed_value import WindowedValue
-
-
-def elements_to_df(elements, include_window_info=False):
-  """Parses the given elements into a Dataframe.
-
-  If the elements are a list of `WindowedValue`s, then it will break out the
-  elements into their own DataFrame and return it. If include_window_info is
-  True, then it will concatenate the windowing information onto the elements
-  DataFrame.
-  """
-
-  rows = []
-  windowed_info = []
-  for e in elements:
-    rows.append(e.value)
-    if include_window_info:
-      windowed_info.append([e.timestamp.micros, e.windows, e.pane_info])
-
-  rows_df = pd.DataFrame(rows)
-  if include_window_info:
-    windowed_info_df = pd.DataFrame(
-        windowed_info, columns=['event_time', 'windows', 'pane_info'])
-    final_df = pd.concat([rows_df, windowed_info_df], axis=1)
-  else:
-    final_df = rows_df
-
-  return final_df
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
deleted file mode 100644
index f9c08c8..0000000
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-
-import unittest
-
-import numpy as np
-import pandas as pd
-
-from apache_beam.runners.interactive import utils
-from apache_beam.utils.windowed_value import WindowedValue
-
-
-class ParseToDataframeTest(unittest.TestCase):
-  def test_parse_windowedvalue(self):
-    """Tests that WindowedValues are supported but not present.
-    """
-    from apache_beam.transforms.window import GlobalWindow
-
-    els = [
-        WindowedValue(('a', 2), 1, [GlobalWindow()]),
-        WindowedValue(('b', 3), 1, [GlobalWindow()])
-    ]
-
-    actual_df = utils.elements_to_df(els, include_window_info=False)
-    expected_df = pd.DataFrame([['a', 2], ['b', 3]], columns=[0, 1])
-    pd.testing.assert_frame_equal(actual_df, expected_df)
-
-  def test_parse_windowedvalue_with_window_info(self):
-    """Tests that WindowedValues are supported and have their own columns.
-    """
-    from apache_beam.transforms.window import GlobalWindow
-
-    els = [
-        WindowedValue(('a', 2), 1, [GlobalWindow()]),
-        WindowedValue(('b', 3), 1, [GlobalWindow()])
-    ]
-
-    actual_df = utils.elements_to_df(els, include_window_info=True)
-    expected_df = pd.DataFrame(
-        [['a', 2, int(1e6), els[0].windows, els[0].pane_info],
-         ['b', 3, int(1e6), els[1].windows, els[1].pane_info]],
-        columns=[0, 1, 'event_time', 'windows', 'pane_info'])
-    pd.testing.assert_frame_equal(actual_df, expected_df)
-
-  def test_parse_windowedvalue_with_dicts(self):
-    """Tests that dicts play well with WindowedValues.
-    """
-    from apache_beam.transforms.window import GlobalWindow
-
-    els = [
-        WindowedValue({
-            'b': 2, 'd': 4
-        }, 1, [GlobalWindow()]),
-        WindowedValue({
-            'a': 1, 'b': 2, 'c': 3
-        }, 1, [GlobalWindow()])
-    ]
-
-    actual_df = utils.elements_to_df(els, include_window_info=True)
-    expected_df = pd.DataFrame(
-        [[np.nan, 2, np.nan, 4, int(1e6), els[0].windows, els[0].pane_info],
-         [1, 2, 3, np.nan, int(1e6), els[1].windows, els[1].pane_info]],
-        columns=['a', 'b', 'c', 'd', 'event_time', 'windows', 'pane_info'])
-    pd.testing.assert_frame_equal(actual_df, expected_df)
-
-
-if __name__ == '__main__':
-  unittest.main()