You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/05/17 23:15:18 UTC

[beam] branch release-2.39.0 updated: Revert "[BEAM-14294] Worker changes to support trivial Batched DoFns (#17384)" (#17694)

This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.39.0 by this push:
     new 9b8ec451903 Revert "[BEAM-14294] Worker changes to support trivial Batched DoFns (#17384)" (#17694)
9b8ec451903 is described below

commit 9b8ec45190385c4b3195dcff455a0fb8d0c7b338
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue May 17 16:15:11 2022 -0700

    Revert "[BEAM-14294] Worker changes to support trivial Batched DoFns (#17384)" (#17694)
    
    This reverts commit 1c4418ce463468b4d5a5b53dfa6e9c35470b976b.
---
 sdks/python/apache_beam/coders/fast_coders_test.py |   2 +-
 sdks/python/apache_beam/runners/common.pxd         |  15 +-
 sdks/python/apache_beam/runners/common.py          | 505 +++++----------------
 .../portability/fn_api_runner/fn_runner_test.py    | 202 ---------
 .../runners/portability/portable_runner_test.py    |   5 -
 .../apache_beam/runners/worker/bundle_processor.py |  32 +-
 .../apache_beam/runners/worker/opcounters.pxd      |   2 -
 .../apache_beam/runners/worker/opcounters.py       |  17 +-
 .../apache_beam/runners/worker/operations.pxd      |  22 +-
 .../apache_beam/runners/worker/operations.py       | 282 ++----------
 .../apache_beam/transforms/batch_dofn_test.py      |  14 -
 sdks/python/apache_beam/transforms/combiners.py    |   4 +-
 .../apache_beam/transforms/ptransform_test.py      |   8 +-
 sdks/python/apache_beam/typehints/batch.py         |   6 -
 sdks/python/apache_beam/utils/windowed_value.pxd   |   8 -
 sdks/python/apache_beam/utils/windowed_value.py    | 129 +-----
 .../apache_beam/utils/windowed_value_test.py       |  92 ----
 17 files changed, 190 insertions(+), 1155 deletions(-)

diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py
index fa8643c2a38..c7112e0e484 100644
--- a/sdks/python/apache_beam/coders/fast_coders_test.py
+++ b/sdks/python/apache_beam/coders/fast_coders_test.py
@@ -29,7 +29,7 @@ from apache_beam.tools import utils
 class FastCoders(unittest.TestCase):
   def test_using_fast_impl(self):
     try:
-      utils.check_compiled('apache_beam.coders.coder_impl')
+      utils.check_compiled('apache_beam.coders')
     except RuntimeError:
       self.skipTest('Cython is not installed')
     # pylint: disable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 9a3d8d250b3..08de4b9c332 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -18,7 +18,6 @@
 cimport cython
 
 from apache_beam.utils.windowed_value cimport WindowedValue
-from apache_beam.utils.windowed_value cimport WindowedBatch
 from apache_beam.transforms.cy_dataflow_distribution_counter cimport DataflowDistributionCounter
 
 from libc.stdint cimport int64_t
@@ -29,15 +28,12 @@ cdef type TaggedOutput, TimestampedValue
 
 cdef class Receiver(object):
   cpdef receive(self, WindowedValue windowed_value)
-  cpdef receive_batch(self, WindowedBatch windowed_batch)
-  cpdef flush(self)
 
 
 cdef class MethodWrapper(object):
   cdef public object args
   cdef public object defaults
   cdef public object method_value
-  cdef str method_name
   cdef bint has_userstate_arguments
   cdef object state_args_to_replace
   cdef object timer_args_to_replace
@@ -54,7 +50,6 @@ cdef class MethodWrapper(object):
 
 cdef class DoFnSignature(object):
   cdef public MethodWrapper process_method
-  cdef public MethodWrapper process_batch_method
   cdef public MethodWrapper start_bundle_method
   cdef public MethodWrapper finish_bundle_method
   cdef public MethodWrapper setup_lifecycle_method
@@ -63,7 +58,6 @@ cdef class DoFnSignature(object):
   cdef public MethodWrapper initial_restriction_method
   cdef public MethodWrapper create_tracker_method
   cdef public MethodWrapper split_method
-  cdef public object batching_configuration
   cdef public object do_fn
   cdef public object timer_methods
   cdef bint _is_stateful_dofn
@@ -87,7 +81,6 @@ cdef class DoFnInvoker(object):
 
 cdef class SimpleInvoker(DoFnInvoker):
   cdef object process_method
-  cdef object process_batch_method
 
 
 cdef class PerWindowInvoker(DoFnInvoker):
@@ -95,14 +88,10 @@ cdef class PerWindowInvoker(DoFnInvoker):
   cdef DoFnContext context
   cdef list args_for_process
   cdef dict kwargs_for_process
-  cdef list placeholders_for_process
-  cdef list args_for_process_batch
-  cdef dict kwargs_for_process_batch
-  cdef list placeholders_for_process_batch
+  cdef list placeholders
   cdef bint has_windowed_inputs
   cdef bint cache_globally_windowed_args
   cdef object process_method
-  cdef object process_batch_method
   cdef bint is_splittable
   cdef object threadsafe_restriction_tracker
   cdef object threadsafe_watermark_estimator
@@ -136,8 +125,6 @@ cdef class _OutputProcessor(OutputProcessor):
   cdef Receiver main_receivers
   cdef object tagged_receivers
   cdef DataflowDistributionCounter per_element_output_counter
-  cdef object output_batch_converter
-
   @cython.locals(windowed_value=WindowedValue,
                  output_element_count=int64_t)
   cpdef process_outputs(self, WindowedValue element, results,
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 594a7e59cf5..7c1cf49d762 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -23,11 +23,9 @@ For internal use only; no backwards-compatibility guarantees.
 """
 
 # pytype: skip-file
-
 import sys
 import threading
 import traceback
-from enum import Enum
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Dict
@@ -56,12 +54,9 @@ from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
 from apache_beam.typehints import typehints
-from apache_beam.typehints.batch import BatchConverter
 from apache_beam.utils.counters import Counter
 from apache_beam.utils.counters import CounterName
 from apache_beam.utils.timestamp import Timestamp
-from apache_beam.utils.windowed_value import HomogeneousWindowedBatch
-from apache_beam.utils.windowed_value import WindowedBatch
 from apache_beam.utils.windowed_value import WindowedValue
 
 if TYPE_CHECKING:
@@ -115,13 +110,6 @@ class Receiver(object):
     # type: (WindowedValue) -> None
     raise NotImplementedError
 
-  def receive_batch(self, windowed_batch):
-    # type: (WindowedBatch) -> None
-    raise NotImplementedError
-
-  def flush(self):
-    raise NotImplementedError
-
 
 class MethodWrapper(object):
   """For internal use only; no backwards-compatibility guarantees.
@@ -148,7 +136,6 @@ class MethodWrapper(object):
 
     # TODO(BEAM-5878) support kwonlyargs on Python 3.
     self.method_value = getattr(obj_to_invoke, method_name)
-    self.method_name = method_name
 
     self.has_userstate_arguments = False
     self.state_args_to_replace = {}  # type: Dict[str, core.StateSpec]
@@ -227,26 +214,6 @@ class MethodWrapper(object):
       return self.method_value()
 
 
-class BatchingPreference(Enum):
-  DO_NOT_CARE = 1  # This operation can operate on batches or element-at-a-time
-  # TODO: Should we also store batching parameters here? (time/size preferences)
-  BATCH_REQUIRED = 2  # This operation can only operate on batches
-  BATCH_FORBIDDEN = 3  # This operation can only work element-at-a-time
-  # Other possibilities: BATCH_PREFERRED (with min batch size specified)
-
-  @property
-  def supports_batches(self) -> bool:
-    return self in (self.BATCH_REQUIRED, self.DO_NOT_CARE)
-
-  @property
-  def supports_elements(self) -> bool:
-    return self in (self.BATCH_FORBIDDEN, self.DO_NOT_CARE)
-
-  @property
-  def requires_batches(self) -> bool:
-    return self == self.BATCH_REQUIRED
-
-
 class DoFnSignature(object):
   """Represents the signature of a given ``DoFn`` object.
 
@@ -266,7 +233,6 @@ class DoFnSignature(object):
     self.do_fn = do_fn
 
     self.process_method = MethodWrapper(do_fn, 'process')
-    self.process_batch_method = MethodWrapper(do_fn, 'process_batch')
     self.start_bundle_method = MethodWrapper(do_fn, 'start_bundle')
     self.finish_bundle_method = MethodWrapper(do_fn, 'finish_bundle')
     self.setup_lifecycle_method = MethodWrapper(do_fn, 'setup')
@@ -313,55 +279,23 @@ class DoFnSignature(object):
   def _validate(self):
     # type: () -> None
     self._validate_process()
-    self._validate_process_batch()
     self._validate_bundle_method(self.start_bundle_method)
     self._validate_bundle_method(self.finish_bundle_method)
     self._validate_stateful_dofn()
 
-  def _check_duplicate_dofn_params(self, method: MethodWrapper):
-    param_ids = [
-        d.param_id for d in method.defaults if isinstance(d, core._DoFnParam)
-    ]
-    if len(param_ids) != len(set(param_ids)):
-      raise ValueError(
-          'DoFn %r has duplicate %s method parameters: %s.' %
-          (self.do_fn, method.method_name, param_ids))
-
   def _validate_process(self):
     # type: () -> None
 
     """Validate that none of the DoFnParameters are repeated in the function
     """
-    self._check_duplicate_dofn_params(self.process_method)
-
-  def _validate_process_batch(self):
-    # type: () -> None
-    self._check_duplicate_dofn_params(self.process_batch_method)
-
-    for d in self.process_batch_method.defaults:
-      if not isinstance(d, core._DoFnParam):
-        continue
-
-      # Helpful errors for params which will be supported in the future
-      if d == (core.DoFn.ElementParam):
-        # We currently assume we can just get the typehint from the first
-        # parameter. ElementParam breaks this assumption
-        raise NotImplementedError(
-            f"DoFn {self.do_fn!r} uses unsupported DoFn param ElementParam.")
-
-      if d in (core.DoFn.KeyParam, core.DoFn.StateParam, core.DoFn.TimerParam):
-        raise NotImplementedError(
-            f"DoFn {self.do_fn!r} has unsupported per-key DoFn param {d}. "
-            "Per-key DoFn params are not yet supported for process_batch "
-            "(BEAM-14409).")
-
-      # Fallback to catch anything not explicitly supported
-      if not d in (core.DoFn.WindowParam,
-                   core.DoFn.TimestampParam,
-                   core.DoFn.PaneInfoParam):
-        raise ValueError(
-            f"DoFn {self.do_fn!r} has unsupported process_batch "
-            f"method parameter {d}")
+    param_ids = [
+        d.param_id for d in self.process_method.defaults
+        if isinstance(d, core._DoFnParam)
+    ]
+    if len(param_ids) != len(set(param_ids)):
+      raise ValueError(
+          'DoFn %r has duplicate process method parameters: %s.' %
+          (self.do_fn, param_ids))
 
   def _validate_bundle_method(self, method_wrapper):
     """Validate that none of the DoFnParameters are used in the function
@@ -422,7 +356,7 @@ class DoFnInvoker(object):
   represented by a given DoFnSignature."""
 
   def __init__(self,
-               output_processor,  # type: _OutputProcessor
+               output_processor,  # type: OutputProcessor
                signature  # type: DoFnSignature
               ):
     # type: (...) -> None
@@ -442,7 +376,7 @@ class DoFnInvoker(object):
   @staticmethod
   def create_invoker(
       signature,  # type: DoFnSignature
-      output_processor,  # type: OutputProcessor
+      output_processor,  # type: _OutputProcessor
       context=None,  # type: Optional[DoFnContext]
       side_inputs=None,   # type: Optional[List[sideinputs.SideInputMap]]
       input_args=None, input_kwargs=None,
@@ -477,10 +411,10 @@ class DoFnInvoker(object):
                                 allows a callback to be registered.
     """
     side_inputs = side_inputs or []
+    default_arg_values = signature.process_method.defaults
     use_per_window_invoker = process_invocation and (
-        side_inputs or input_args or input_kwargs or
-        signature.process_method.defaults or
-        signature.process_batch_method.defaults or signature.is_stateful_dofn())
+        side_inputs or input_args or input_kwargs or default_arg_values or
+        signature.is_stateful_dofn())
     if not use_per_window_invoker:
       return SimpleInvoker(output_processor, signature)
     else:
@@ -523,26 +457,6 @@ class DoFnInvoker(object):
     """
     raise NotImplementedError
 
-  def invoke_process_batch(self,
-                     windowed_batch,  # type: WindowedBatch
-                     additional_args=None,
-                     additional_kwargs=None
-                    ):
-    # type: (...) -> None
-
-    """Invokes the DoFn.process() function.
-
-    Args:
-      windowed_batch: a WindowedBatch object that gives a batch of elements for
-                      which process_batch() method should be invoked, along with
-                      the window each element belongs to.
-      additional_args: additional arguments to be passed to the current
-                      `DoFn.process()` invocation, usually as side inputs.
-      additional_kwargs: additional keyword arguments to be passed to the
-                         current `DoFn.process()` invocation.
-    """
-    raise NotImplementedError
-
   def invoke_setup(self):
     # type: () -> None
 
@@ -610,7 +524,6 @@ class SimpleInvoker(DoFnInvoker):
     # type: (...) -> None
     super().__init__(output_processor, signature)
     self.process_method = signature.process_method.method_value
-    self.process_batch_method = signature.process_batch_method.method_value
 
   def invoke_process(self,
                      windowed_value,  # type: WindowedValue
@@ -624,94 +537,12 @@ class SimpleInvoker(DoFnInvoker):
         windowed_value, self.process_method(windowed_value.value))
     return []
 
-  def invoke_process_batch(self,
-                     windowed_batch,  # type: WindowedBatch
-                     restriction=None,
-                     watermark_estimator_state=None,
-                     additional_args=None,
-                     additional_kwargs=None
-                    ):
-    # type: (...) -> None
-    self.output_processor.process_batch_outputs(
-        windowed_batch, self.process_batch_method(windowed_batch.values))
-
-
-def _get_arg_placeholders(
-    method: MethodWrapper,
-    input_args: Optional[List[Any]],
-    input_kwargs: Optional[Dict[str, any]]):
-  input_args = input_args if input_args else []
-  input_kwargs = input_kwargs if input_kwargs else {}
-
-  arg_names = method.args
-  default_arg_values = method.defaults
-
-  # Create placeholder for element parameter of DoFn.process() method.
-  # Not to be confused with ArgumentPlaceHolder, which may be passed in
-  # input_args and is a placeholder for side-inputs.
-  class ArgPlaceholder(object):
-    def __init__(self, placeholder):
-      self.placeholder = placeholder
-
-  if all(core.DoFn.ElementParam != arg for arg in default_arg_values):
-    # TODO(BEAM-7867): Handle cases in which len(arg_names) ==
-    #   len(default_arg_values).
-    args_to_pick = len(arg_names) - len(default_arg_values) - 1
-    # Positional argument values for process(), with placeholders for special
-    # values such as the element, timestamp, etc.
-    args_with_placeholders = ([ArgPlaceholder(core.DoFn.ElementParam)] +
-                              input_args[:args_to_pick])
-  else:
-    args_to_pick = len(arg_names) - len(default_arg_values)
-    args_with_placeholders = input_args[:args_to_pick]
-
-  # Fill the OtherPlaceholders for context, key, window or timestamp
-  remaining_args_iter = iter(input_args[args_to_pick:])
-  for a, d in zip(arg_names[-len(default_arg_values):], default_arg_values):
-    if core.DoFn.ElementParam == d:
-      args_with_placeholders.append(ArgPlaceholder(d))
-    elif core.DoFn.KeyParam == d:
-      args_with_placeholders.append(ArgPlaceholder(d))
-    elif core.DoFn.WindowParam == d:
-      args_with_placeholders.append(ArgPlaceholder(d))
-    elif core.DoFn.TimestampParam == d:
-      args_with_placeholders.append(ArgPlaceholder(d))
-    elif core.DoFn.PaneInfoParam == d:
-      args_with_placeholders.append(ArgPlaceholder(d))
-    elif core.DoFn.SideInputParam == d:
-      # If no more args are present then the value must be passed via kwarg
-      try:
-        args_with_placeholders.append(next(remaining_args_iter))
-      except StopIteration:
-        if a not in input_kwargs:
-          raise ValueError("Value for sideinput %s not provided" % a)
-    elif isinstance(d, core.DoFn.StateParam):
-      args_with_placeholders.append(ArgPlaceholder(d))
-    elif isinstance(d, core.DoFn.TimerParam):
-      args_with_placeholders.append(ArgPlaceholder(d))
-    elif isinstance(d, type) and core.DoFn.BundleFinalizerParam == d:
-      args_with_placeholders.append(ArgPlaceholder(d))
-    else:
-      # If no more args are present then the value must be passed via kwarg
-      try:
-        args_with_placeholders.append(next(remaining_args_iter))
-      except StopIteration:
-        pass
-  args_with_placeholders.extend(list(remaining_args_iter))
-
-  # Stash the list of placeholder positions for performance
-  placeholders = [(i, x.placeholder)
-                  for (i, x) in enumerate(args_with_placeholders)
-                  if isinstance(x, ArgPlaceholder)]
-
-  return placeholders, args_with_placeholders, input_kwargs
-
 
 class PerWindowInvoker(DoFnInvoker):
   """An invoker that processes elements considering windowing information."""
 
   def __init__(self,
-               output_processor,  # type: OutputProcessor
+               output_processor,  # type: _OutputProcessor
                signature,  # type: DoFnSignature
                context,  # type: DoFnContext
                side_inputs,  # type: Iterable[sideinputs.SideInputMap]
@@ -726,45 +557,95 @@ class PerWindowInvoker(DoFnInvoker):
     self.process_method = signature.process_method.method_value
     default_arg_values = signature.process_method.defaults
     self.has_windowed_inputs = (
-        not all(si.is_globally_windowed() for si in side_inputs) or any(
-            core.DoFn.WindowParam == arg
-            for arg in signature.process_method.defaults) or any(
-                core.DoFn.WindowParam == arg
-                for arg in signature.process_batch_method.defaults) or
+        not all(si.is_globally_windowed() for si in side_inputs) or
+        any(core.DoFn.WindowParam == arg for arg in default_arg_values) or
         signature.is_stateful_dofn())
     self.user_state_context = user_state_context
     self.is_splittable = signature.is_splittable_dofn()
-    self.is_key_param_required = any(
-        core.DoFn.KeyParam == arg for arg in default_arg_values)
     self.threadsafe_restriction_tracker = None  # type: Optional[ThreadsafeRestrictionTracker]
     self.threadsafe_watermark_estimator = None  # type: Optional[ThreadsafeWatermarkEstimator]
     self.current_windowed_value = None  # type: Optional[WindowedValue]
     self.bundle_finalizer_param = bundle_finalizer_param
+    self.is_key_param_required = False
     if self.is_splittable:
       self.splitting_lock = threading.Lock()
       self.current_window_index = None
       self.stop_window_index = None
 
+    # Try to prepare all the arguments that can just be filled in
+    # without any additional work. in the process function.
+    # Also cache all the placeholders needed in the process function.
+
     # Flag to cache additional arguments on the first element if all
     # inputs are within the global window.
     self.cache_globally_windowed_args = not self.has_windowed_inputs
 
-    # Try to prepare all the arguments that can just be filled in
-    # without any additional work. in the process function.
-    # Also cache all the placeholders needed in the process function.
-    (
-        self.placeholders_for_process,
-        self.args_for_process,
-        self.kwargs_for_process) = _get_arg_placeholders(
-            signature.process_method, input_args, input_kwargs)
+    input_args = input_args if input_args else []
+    input_kwargs = input_kwargs if input_kwargs else {}
+
+    arg_names = signature.process_method.args
+
+    # Create placeholder for element parameter of DoFn.process() method.
+    # Not to be confused with ArgumentPlaceHolder, which may be passed in
+    # input_args and is a placeholder for side-inputs.
+    class ArgPlaceholder(object):
+      def __init__(self, placeholder):
+        self.placeholder = placeholder
+
+    if all(core.DoFn.ElementParam != arg for arg in default_arg_values):
+      # TODO(BEAM-7867): Handle cases in which len(arg_names) ==
+      #   len(default_arg_values).
+      args_to_pick = len(arg_names) - len(default_arg_values) - 1
+      # Positional argument values for process(), with placeholders for special
+      # values such as the element, timestamp, etc.
+      args_with_placeholders = ([ArgPlaceholder(core.DoFn.ElementParam)] +
+                                input_args[:args_to_pick])
+    else:
+      args_to_pick = len(arg_names) - len(default_arg_values)
+      args_with_placeholders = input_args[:args_to_pick]
+
+    # Fill the OtherPlaceholders for context, key, window or timestamp
+    remaining_args_iter = iter(input_args[args_to_pick:])
+    for a, d in zip(arg_names[-len(default_arg_values):], default_arg_values):
+      if core.DoFn.ElementParam == d:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif core.DoFn.KeyParam == d:
+        self.is_key_param_required = True
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif core.DoFn.WindowParam == d:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif core.DoFn.TimestampParam == d:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif core.DoFn.PaneInfoParam == d:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif core.DoFn.SideInputParam == d:
+        # If no more args are present then the value must be passed via kwarg
+        try:
+          args_with_placeholders.append(next(remaining_args_iter))
+        except StopIteration:
+          if a not in input_kwargs:
+            raise ValueError("Value for sideinput %s not provided" % a)
+      elif isinstance(d, core.DoFn.StateParam):
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif isinstance(d, core.DoFn.TimerParam):
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif isinstance(d, type) and core.DoFn.BundleFinalizerParam == d:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      else:
+        # If no more args are present then the value must be passed via kwarg
+        try:
+          args_with_placeholders.append(next(remaining_args_iter))
+        except StopIteration:
+          pass
+    args_with_placeholders.extend(list(remaining_args_iter))
 
-    self.process_batch_method = signature.process_batch_method.method_value
+    # Stash the list of placeholder positions for performance
+    self.placeholders = [(i, x.placeholder)
+                         for (i, x) in enumerate(args_with_placeholders)
+                         if isinstance(x, ArgPlaceholder)]
 
-    (
-        self.placeholders_for_process_batch,
-        self.args_for_process_batch,
-        self.kwargs_for_process_batch) = _get_arg_placeholders(
-            signature.process_batch_method, input_args, input_kwargs)
+    self.args_for_process = args_with_placeholders
+    self.kwargs_for_process = input_kwargs
 
   def invoke_process(self,
                      windowed_value,  # type: WindowedValue
@@ -838,33 +719,6 @@ class PerWindowInvoker(DoFnInvoker):
           windowed_value, additional_args, additional_kwargs)
     return residuals
 
-  def invoke_process_batch(self,
-                     windowed_batch,  # type: WindowedBatch
-                     additional_args=None,
-                     additional_kwargs=None
-                    ):
-    # type: (...) -> None
-
-    if not additional_args:
-      additional_args = []
-    if not additional_kwargs:
-      additional_kwargs = {}
-
-    assert isinstance(windowed_batch, HomogeneousWindowedBatch)
-
-    if self.has_windowed_inputs and len(windowed_batch.windows) != 1:
-      for w in windowed_batch.windows:
-        self._invoke_process_batch_per_window(
-            HomogeneousWindowedBatch.of(
-                windowed_batch.values,
-                windowed_batch.timestamp, (w, ),
-                windowed_batch.pane_info),
-            additional_args,
-            additional_kwargs)
-    else:
-      self._invoke_process_batch_per_window(
-          windowed_batch, additional_args, additional_kwargs)
-
   def _should_process_window_for_sdf(
       self,
       windowed_value, # type: WindowedValue
@@ -908,9 +762,7 @@ class PerWindowInvoker(DoFnInvoker):
                                  additional_kwargs,
                                 ):
     # type: (...) -> Optional[SplitResultResidual]
-
     if self.has_windowed_inputs:
-      assert len(windowed_value.windows) <= 1
       window, = windowed_value.windows
       side_inputs = [si[window] for si in self.side_inputs]
       side_inputs.extend(additional_args)
@@ -946,7 +798,7 @@ class PerWindowInvoker(DoFnInvoker):
             'Input value to a stateful DoFn or KeyParam must be a KV tuple; '
             'instead, got \'%s\'.') % (windowed_value.value, ))
 
-    for i, p in self.placeholders_for_process:
+    for i, p in self.placeholders:
       if core.DoFn.ElementParam == p:
         args_for_process[i] = windowed_value.value
       elif core.DoFn.KeyParam == p:
@@ -973,15 +825,23 @@ class PerWindowInvoker(DoFnInvoker):
       elif core.DoFn.BundleFinalizerParam == p:
         args_for_process[i] = self.bundle_finalizer_param
 
-    kwargs_for_process = kwargs_for_process or {}
-
     if additional_kwargs:
-      kwargs_for_process.update(additional_kwargs)
+      if kwargs_for_process is None:
+        kwargs_for_process = additional_kwargs
+      else:
+        for key in additional_kwargs:
+          kwargs_for_process[key] = additional_kwargs[key]
 
-    self.output_processor.process_outputs(
-        windowed_value,
-        self.process_method(*args_for_process, **kwargs_for_process),
-        self.threadsafe_watermark_estimator)
+    if kwargs_for_process:
+      self.output_processor.process_outputs(
+          windowed_value,
+          self.process_method(*args_for_process, **kwargs_for_process),
+          self.threadsafe_watermark_estimator)
+    else:
+      self.output_processor.process_outputs(
+          windowed_value,
+          self.process_method(*args_for_process),
+          self.threadsafe_watermark_estimator)
 
     if self.is_splittable:
       assert self.threadsafe_restriction_tracker is not None
@@ -1006,68 +866,6 @@ class PerWindowInvoker(DoFnInvoker):
             deferred_timestamp=deferred_timestamp)
     return None
 
-  def _invoke_process_batch_per_window(
-      self,
-      windowed_batch: WindowedBatch,
-      additional_args,
-      additional_kwargs,
-  ):
-    # type: (...) -> Optional[SplitResultResidual]
-
-    if self.has_windowed_inputs:
-      assert isinstance(windowed_batch, HomogeneousWindowedBatch)
-      assert len(windowed_batch.windows) <= 1
-
-      window, = windowed_batch.windows
-      side_inputs = [si[window] for si in self.side_inputs]
-      side_inputs.extend(additional_args)
-      (args_for_process_batch,
-       kwargs_for_process_batch) = util.insert_values_in_args(
-           self.args_for_process_batch,
-           self.kwargs_for_process_batch,
-           side_inputs)
-    elif self.cache_globally_windowed_args:
-      # Attempt to cache additional args if all inputs are globally
-      # windowed inputs when processing the first element.
-      self.cache_globally_windowed_args = False
-
-      # Fill in sideInputs if they are globally windowed
-      global_window = GlobalWindow()
-      self.args_for_process_batch, self.kwargs_for_process_batch = (
-          util.insert_values_in_args(
-              self.args_for_process_batch, self.kwargs_for_process_batch,
-              [si[global_window] for si in self.side_inputs]))
-      args_for_process_batch, kwargs_for_process_batch = (
-          self.args_for_process_batch, self.kwargs_for_process_batch)
-    else:
-      args_for_process_batch, kwargs_for_process_batch = (
-          self.args_for_process_batch, self.kwargs_for_process_batch)
-
-    for i, p in self.placeholders_for_process_batch:
-      if core.DoFn.ElementParam == p:
-        args_for_process_batch[i] = windowed_batch.values
-      elif core.DoFn.KeyParam == p:
-        raise NotImplementedError("BEAM-14409: Per-key process_batch")
-      elif core.DoFn.WindowParam == p:
-        args_for_process_batch[i] = window
-      elif core.DoFn.TimestampParam == p:
-        args_for_process_batch[i] = windowed_batch.timestamp
-      elif core.DoFn.PaneInfoParam == p:
-        assert isinstance(windowed_batch, HomogeneousWindowedBatch)
-        args_for_process_batch[i] = windowed_batch.pane_info
-      elif isinstance(p, core.DoFn.StateParam):
-        raise NotImplementedError("BEAM-14409: Per-key process_batch")
-      elif isinstance(p, core.DoFn.TimerParam):
-        raise NotImplementedError("BEAM-14409: Per-key process_batch")
-
-    kwargs_for_process_batch = kwargs_for_process_batch or {}
-
-    self.output_processor.process_batch_outputs(
-        windowed_batch,
-        self.process_batch_method(
-            *args_for_process_batch, **kwargs_for_process_batch),
-        self.threadsafe_watermark_estimator)
-
   @staticmethod
   def _try_split(fraction,
       window_index, # type: Optional[int]
@@ -1372,15 +1170,11 @@ class DoFnRunner:
     else:
       per_element_output_counter = None
 
-    # TODO(BEAM-14293): output processor assumes DoFns are batch-to-batch or
-    # element-to-element, @yields_batches and @yields_elements will break this
-    # assumption.
     output_processor = _OutputProcessor(
         windowing.windowfn,
         main_receivers,
         tagged_receivers,
-        per_element_output_counter,
-        getattr(fn, 'output_batch_converter', None))
+        per_element_output_counter)
 
     if do_fn_signature.is_stateful_dofn() and not user_state_context:
       raise Exception(
@@ -1406,13 +1200,6 @@ class DoFnRunner:
       self._reraise_augmented(exn)
       return []
 
-  def process_batch(self, windowed_batch):
-    # type: (WindowedBatch) -> None
-    try:
-      self.do_fn_invoker.invoke_process_batch(windowed_batch)
-    except BaseException as exn:
-      self._reraise_augmented(exn)
-
   def process_with_sized_restriction(self, windowed_value):
     # type: (WindowedValue) -> Iterable[SplitResultResidual]
     (element, (restriction, estimator_state)), _ = windowed_value.value
@@ -1500,11 +1287,6 @@ class OutputProcessor(object):
     # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
     raise NotImplementedError
 
-  def process_batch_outputs(
-      self, windowed_input_element, results, watermark_estimator=None):
-    # type: (WindowedBatch, Iterable[Any], Optional[WatermarkEstimator]) -> None
-    raise NotImplementedError
-
 
 class _OutputProcessor(OutputProcessor):
   """Processes output produced by DoFn method invocations."""
@@ -1513,9 +1295,7 @@ class _OutputProcessor(OutputProcessor):
                window_fn,
                main_receivers,  # type: Receiver
                tagged_receivers,  # type: Mapping[Optional[str], Receiver]
-               per_element_output_counter,
-               output_batch_converter, # type: Optional[BatchConverter]
-               ):
+               per_element_output_counter):
     """Initializes ``_OutputProcessor``.
 
     Args:
@@ -1528,12 +1308,7 @@ class _OutputProcessor(OutputProcessor):
     self.window_fn = window_fn
     self.main_receivers = main_receivers
     self.tagged_receivers = tagged_receivers
-    if (per_element_output_counter is not None and
-        per_element_output_counter.is_cythonized):
-      self.per_element_output_counter = per_element_output_counter
-    else:
-      self.per_element_output_counter = None
-    self.output_batch_converter = output_batch_converter
+    self.per_element_output_counter = per_element_output_counter
 
   def process_outputs(
       self, windowed_input_element, results, watermark_estimator=None):
@@ -1547,7 +1322,8 @@ class _OutputProcessor(OutputProcessor):
     if results is None:
       # TODO(BEAM-3937): Remove if block after output counter released.
       # Only enable per_element_output_counter when counter cythonized.
-      if self.per_element_output_counter is not None:
+      if (self.per_element_output_counter is not None and
+          self.per_element_output_counter.is_cythonized):
         self.per_element_output_counter.add_input(0)
       return
 
@@ -1585,75 +1361,10 @@ class _OutputProcessor(OutputProcessor):
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
-
-    # TODO(BEAM-3937): Remove if block after output counter released.
-    # Only enable per_element_output_counter when counter cythonized
-    if self.per_element_output_counter is not None:
-      self.per_element_output_counter.add_input(output_element_count)
-
-  def process_batch_outputs(
-      self, windowed_input_batch, results, watermark_estimator=None):
-    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
-
-    """Dispatch the result of process computation to the appropriate receivers.
-
-    A value wrapped in a TaggedOutput object will be unwrapped and
-    then dispatched to the appropriate indexed output.
-    """
-    if results is None:
-      # TODO(BEAM-3937): Remove if block after output counter released.
-      # Only enable per_element_output_counter when counter cythonized.
-      if self.per_element_output_counter is not None:
-        self.per_element_output_counter.add_input(0)
-      return
-
-    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
-    #  if performance_runtime_type_check is active, without harming performance
-
-    assert self.output_batch_converter is not None
-
-    output_element_count = 0
-    for result in results:
-      tag = None
-      if isinstance(result, TaggedOutput):
-        tag = result.tag
-        if not isinstance(tag, str):
-          raise TypeError('In %s, tag %s is not a string' % (self, tag))
-        result = result.value
-      if isinstance(result, (WindowedValue, TimestampedValue)):
-        raise TypeError(
-            f"Received {type(result).__name__} from DoFn that was "
-            "expected to produce a batch.")
-      if isinstance(result, WindowedBatch):
-        assert isinstance(result, HomogeneousWindowedBatch)
-        windowed_batch = result
-
-        if (windowed_input_batch is not None and
-            len(windowed_input_batch.windows) != 1):
-          windowed_batch.windows *= len(windowed_input_batch.windows)
-      # TODO(BEAM-14352): Add TimestampedBatch, an analogue for TimestampedValue
-      # and handle it here (see TimestampedValue logic in process_outputs).
-      else:
-        # TODO: This should error unless the DoFn was defined with
-        # @DoFn.yields_batches(output_aligned_with_input=True)
-        # We should consider also validating that the length is the same as
-        # windowed_input_batch
-        windowed_batch = windowed_input_batch.with_values(result)
-
-      output_element_count += self.output_batch_converter.get_length(
-          windowed_input_batch.values)
-
-      if watermark_estimator is not None:
-        for timestamp in windowed_batch.timestamps:
-          watermark_estimator.observe_timestamp(timestamp)
-      if tag is None:
-        self.main_receivers.receive_batch(windowed_batch)
-      else:
-        self.tagged_receivers[tag].receive_batch(windowed_batch)
-
     # TODO(BEAM-3937): Remove if block after output counter released.
     # Only enable per_element_output_counter when counter cythonized
-    if self.per_element_output_counter is not None:
+    if (self.per_element_output_counter is not None and
+        self.per_element_output_counter.is_cythonized):
       self.per_element_output_counter.add_input(output_element_count)
 
   def start_bundle_outputs(self, results):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 06016706d03..6daf025d2ad 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -32,13 +32,9 @@ import unittest
 import uuid
 from typing import Any
 from typing import Dict
-from typing import Iterator
-from typing import List
 from typing import Tuple
-from typing import no_type_check
 
 import hamcrest  # pylint: disable=ungrouped-imports
-import numpy as np
 import pytest
 from hamcrest.core.matcher import Matcher
 from hamcrest.core.string_description import StringDescription
@@ -63,17 +59,14 @@ from apache_beam.runners.portability.fn_api_runner import fn_runner
 from apache_beam.runners.sdf_utils import RestrictionTrackerView
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import statesampler
-from apache_beam.runners.worker.operations import InefficientExecutionWarning
 from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-from apache_beam.tools import utils
 from apache_beam.transforms import environments
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
 from apache_beam.utils import timestamp
-from apache_beam.utils import windowed_value
 
 if statesampler.FAST_SAMPLER:
   DEFAULT_SAMPLING_PERIOD_MS = statesampler.DEFAULT_SAMPLING_PERIOD_MS
@@ -128,174 +121,6 @@ class FnApiRunnerTest(unittest.TestCase):
           | beam.Map(lambda e: e + 'x'))
       assert_that(res, equal_to(['aax', 'bcbcx']))
 
-  def test_batch_pardo(self):
-    with self.create_pipeline() as p:
-      res = (
-          p
-          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
-              np.int64)
-          | beam.ParDo(ArrayMultiplyDoFn())
-          | beam.Map(lambda x: x * 3))
-
-      assert_that(res, equal_to([6, 12, 18]))
-
-  def test_batch_pardo_trigger_flush(self):
-    try:
-      utils.check_compiled('apache_beam.coders.coder_impl')
-    except RuntimeError:
-      self.skipTest(
-          'BEAM-14410: FnRunnerTest with non-trivial inputs flakes '
-          'in non-cython environments')
-
-    with self.create_pipeline() as p:
-      res = (
-          p
-          # Pass more than GeneralPurposeConsumerSet.MAX_BATCH_SIZE elements
-          # here to make sure we exercise the batch size limit.
-          | beam.Create(np.array(range(5000),
-                                 dtype=np.int64)).with_output_types(np.int64)
-          | beam.ParDo(ArrayMultiplyDoFn())
-          | beam.Map(lambda x: x * 3))
-
-      assert_that(res, equal_to([i * 2 * 3 for i in range(5000)]))
-
-  def test_batch_rebatch_pardos(self):
-    # Should raise a warning about the rebatching that mentions:
-    # - The consuming DoFn
-    # - The output batch type of the producer
-    # - The input batch type of the consumer
-    with self.assertWarnsRegex(InefficientExecutionWarning,
-                               r'ListPlusOneDoFn.*NumpyArray.*List\[int64\]'):
-      with self.create_pipeline() as p:
-        res = (
-            p
-            | beam.Create(np.array([1, 2, 3],
-                                   dtype=np.int64)).with_output_types(np.int64)
-            | beam.ParDo(ArrayMultiplyDoFn())
-            | beam.ParDo(ListPlusOneDoFn())
-            | beam.Map(lambda x: x * 3))
-
-        assert_that(res, equal_to([9, 15, 21]))
-
-  def test_batch_pardo_fusion_break(self):
-    class NormalizeDoFn(beam.DoFn):
-      @no_type_check
-      def process_batch(
-          self,
-          batch: np.ndarray,
-          mean: np.float64,
-      ) -> Iterator[np.ndarray]:
-        assert isinstance(batch, np.ndarray)
-        yield batch - mean
-
-      # infer_output_type must be defined (when there's no process method),
-      # otherwise we don't know the input type is the same as output type.
-      def infer_output_type(self, input_type):
-        return np.float64
-
-    with self.create_pipeline() as p:
-      pc = (
-          p
-          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
-              np.int64)
-          | beam.ParDo(ArrayMultiplyDoFn()))
-
-      res = (
-          pc
-          | beam.ParDo(
-              NormalizeDoFn(),
-              mean=beam.pvalue.AsSingleton(
-                  pc | beam.CombineGlobally(beam.combiners.MeanCombineFn()))))
-      assert_that(res, equal_to([-2, 0, 2]))
-
-  def test_batch_pardo_dofn_params(self):
-    class ConsumeParamsDoFn(beam.DoFn):
-      @no_type_check
-      def process_batch(
-          self,
-          batch: np.ndarray,
-          ts=beam.DoFn.TimestampParam,
-          pane_info=beam.DoFn.PaneInfoParam,
-      ) -> Iterator[np.ndarray]:
-        assert isinstance(batch, np.ndarray)
-        assert isinstance(ts, timestamp.Timestamp)
-        assert isinstance(pane_info, windowed_value.PaneInfo)
-
-        yield batch * ts.seconds()
-
-      # infer_output_type must be defined (when there's no process method),
-      # otherwise we don't know the input type is the same as output type.
-      def infer_output_type(self, input_type):
-        return input_type
-
-    with self.create_pipeline() as p:
-      res = (
-          p
-          | beam.Create(np.array(range(10), dtype=np.int64)).with_output_types(
-              np.int64)
-          | beam.Map(lambda t: window.TimestampedValue(t, int(t % 2))).
-          with_output_types(np.int64)
-          | beam.ParDo(ConsumeParamsDoFn()))
-
-      assert_that(res, equal_to([0, 1, 0, 3, 0, 5, 0, 7, 0, 9]))
-
-  def test_batch_pardo_window_param(self):
-    class PerWindowDoFn(beam.DoFn):
-      @no_type_check
-      def process_batch(
-          self,
-          batch: np.ndarray,
-          window=beam.DoFn.WindowParam,
-      ) -> Iterator[np.ndarray]:
-        yield batch * window.start.seconds()
-
-      # infer_output_type must be defined (when there's no process method),
-      # otherwise we don't know the input type is the same as output type.
-      def infer_output_type(self, input_type):
-        return input_type
-
-    with self.create_pipeline() as p:
-      res = (
-          p
-          | beam.Create(np.array(range(10), dtype=np.int64)).with_output_types(
-              np.int64)
-          | beam.Map(lambda t: window.TimestampedValue(t, int(t))).
-          with_output_types(np.int64)
-          | beam.WindowInto(window.FixedWindows(5))
-          | beam.ParDo(PerWindowDoFn()))
-
-      assert_that(res, equal_to([0, 0, 0, 0, 0, 25, 30, 35, 40, 45]))
-
-  def test_batch_pardo_overlapping_windows(self):
-    class PerWindowDoFn(beam.DoFn):
-      @no_type_check
-      def process_batch(self,
-                        batch: np.ndarray,
-                        window=beam.DoFn.WindowParam) -> Iterator[np.ndarray]:
-        yield batch * window.start.seconds()
-
-      # infer_output_type must be defined (when there's no process method),
-      # otherwise we don't know the input type is the same as output type.
-      def infer_output_type(self, input_type):
-        return input_type
-
-    with self.create_pipeline() as p:
-      res = (
-          p
-          | beam.Create(np.array(range(10), dtype=np.int64)).with_output_types(
-              np.int64)
-          | beam.Map(lambda t: window.TimestampedValue(t, int(t))).
-          with_output_types(np.int64)
-          | beam.WindowInto(window.SlidingWindows(size=5, period=3))
-          | beam.ParDo(PerWindowDoFn()))
-
-      assert_that(res, equal_to([               0*-3, 1*-3, # [-3, 2)
-                                 0*0, 1*0, 2*0, 3* 0, 4* 0, # [ 0, 5)
-                                 3*3, 4*3, 5*3, 6* 3, 7* 3, # [ 3, 8)
-                                 6*6, 7*6, 8*6, 9* 6,       # [ 6, 11)
-                                 9*9                        # [ 9, 14)
-                                 ]))
-
   @retry(stop=stop_after_attempt(3))
   def test_pardo_side_outputs(self):
     def tee(elem, *tags):
@@ -2308,33 +2133,6 @@ class ExpectingSideInputsFn(beam.DoFn):
     yield self._name
 
 
-class ArrayMultiplyDoFn(beam.DoFn):
-  def process_batch(self, batch: np.ndarray, *unused_args,
-                    **unused_kwargs) -> Iterator[np.ndarray]:
-    assert isinstance(batch, np.ndarray)
-    # GeneralPurposeConsumerSet should limit batches to MAX_BATCH_SIZE (4096)
-    # elements
-    assert np.size(batch, axis=0) <= 4096
-    yield batch * 2
-
-  # infer_output_type must be defined (when there's no process method),
-  # otherwise we don't know the input type is the same as output type.
-  def infer_output_type(self, input_type):
-    return input_type
-
-
-class ListPlusOneDoFn(beam.DoFn):
-  def process_batch(self, batch: List[np.int64], *unused_args,
-                    **unused_kwargs) -> Iterator[List[np.int64]]:
-    assert isinstance(batch, list)
-    yield [element + 1 for element in batch]
-
-  # infer_output_type must be defined (when there's no process method),
-  # otherwise we don't know the input type is the same as output type.
-  def infer_output_type(self, input_type):
-    return input_type
-
-
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index e13b25d8eba..b0404640ac7 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -291,11 +291,6 @@ class PortableRunnerTestWithSubprocesses(PortableRunnerTest):
         str(job_port),
     ]
 
-  def test_batch_rebatch_pardos(self):
-    raise unittest.SkipTest(
-        "Portable runners with subprocess can't make "
-        "assertions about warnings raised on the worker.")
-
 
 class PortableRunnerTestWithSubprocessesAndMultiWorkers(
     PortableRunnerTestWithSubprocesses):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 1f6239a6dc6..3ce8fe58da5 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -156,8 +156,8 @@ class DataOutputOperation(RunnerIOOperation):
 
   def finish(self):
     # type: () -> None
-    super().finish()
     self.output_stream.close()
+    super().finish()
 
 
 class DataInputOperation(RunnerIOOperation):
@@ -183,28 +183,21 @@ class DataInputOperation(RunnerIOOperation):
         windowed_coder,
         transform_id=transform_id,
         data_channel=data_channel)
-
-    self.consumer = next(iter(consumers.values()))
+    # We must do this manually as we don't have a spec or spec.output_coders.
+    self.receivers = [
+        operations.ConsumerSet.create(
+            self.counter_factory,
+            self.name_context.step_name,
+            0,
+            next(iter(consumers.values())),
+            self.windowed_coder,
+            self._get_runtime_performance_hints())
+    ]
     self.splitting_lock = threading.Lock()
     self.index = -1
     self.stop = float('inf')
     self.started = False
 
-  def setup(self):
-    with self.scoped_start_state:
-      super().setup()
-      # We must do this manually as we don't have a spec or spec.output_coders.
-      self.receivers = [
-          operations.ConsumerSet.create(
-              self.counter_factory,
-              self.name_context.step_name,
-              0,
-              self.consumer,
-              self.windowed_coder,
-              self.get_output_batch_converter(),
-              self._get_runtime_performance_hints())
-      ]
-
   def start(self):
     # type: () -> None
     super().start()
@@ -327,7 +320,6 @@ class DataInputOperation(RunnerIOOperation):
 
   def finish(self):
     # type: () -> None
-    super().finish()
     with self.splitting_lock:
       self.index += 1
       self.started = False
@@ -869,7 +861,7 @@ class BundleProcessor(object):
         'fnapi-step-%s' % self.process_bundle_descriptor.id,
         self.counter_factory)
     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
-    for op in reversed(self.ops.values()):
+    for op in self.ops.values():
       op.setup()
     self.splitting_lock = threading.Lock()
 
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.pxd b/sdks/python/apache_beam/runners/worker/opcounters.pxd
index ef2d776eabd..8b7d80375ef 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.pxd
+++ b/sdks/python/apache_beam/runners/worker/opcounters.pxd
@@ -60,10 +60,8 @@ cdef class OperationCounters(object):
   cdef public libc.stdint.int64_t _sample_counter
   cdef public libc.stdint.int64_t _next_sample
   cdef public object output_type_constraints
-  cdef public object producer_batch_converter
 
   cpdef update_from(self, windowed_value)
-  cpdef update_from_batch(self, windowed_batch)
   cdef inline do_sample(self, windowed_value)
   cpdef update_collect(self)
   cpdef type_check(self, value)
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index 1dbcbb7a22c..fad54aaeaf8 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -32,13 +32,12 @@ from typing import Optional
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints.decorators import _check_instance_type
 from apache_beam.utils import counters
-from apache_beam.utils import windowed_value
 from apache_beam.utils.counters import Counter
 from apache_beam.utils.counters import CounterName
 
 if TYPE_CHECKING:
+  from apache_beam.utils import windowed_value
   from apache_beam.runners.worker.statesampler import StateSampler
-  from apache_beam.typehints.batch import BatchConverter
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -190,9 +189,7 @@ class OperationCounters(object):
       coder,
       index,
       suffix='out',
-      producer_type_hints=None,
-      producer_batch_converter=None, # type: Optional[BatchConverter]
-  ):
+      producer_type_hints=None):
     self._counter_factory = counter_factory
     self.element_counter = counter_factory.get_counter(
         '%s-%s%s-ElementCount' % (step_name, suffix, index), Counter.SUM)
@@ -205,7 +202,6 @@ class OperationCounters(object):
     self._sample_counter = 0
     self._next_sample = 0
     self.output_type_constraints = producer_type_hints or {}
-    self.producer_batch_converter = producer_batch_converter
 
   def update_from(self, windowed_value):
     # type: (windowed_value.WindowedValue) -> None
@@ -214,15 +210,6 @@ class OperationCounters(object):
     if self._should_sample():
       self.do_sample(windowed_value)
 
-  def update_from_batch(self, windowed_batch):
-    # type: (windowed_value.WindowedBatch) -> None
-    assert self.producer_batch_converter is not None
-    assert isinstance(windowed_batch, windowed_value.HomogeneousWindowedBatch)
-
-    self.element_counter.update(
-        self.producer_batch_converter.get_length(windowed_batch.values))
-    # TODO(BEAM-14408): Update byte size estimate
-
   def _observable_callback(self, inner_coder_impl, accumulator):
     def _observable_callback_inner(value, is_encoded=False):
       # TODO(ccy): If this stream is large, sample it as well.
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index cf1f1b3fb51..800e5870b96 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -21,7 +21,6 @@ from apache_beam.runners.common cimport DoFnRunner
 from apache_beam.runners.common cimport Receiver
 from apache_beam.runners.worker cimport opcounters
 from apache_beam.utils.windowed_value cimport WindowedValue
-from apache_beam.utils.windowed_value cimport WindowedBatch
 #from libcpp.string cimport string
 
 cdef WindowedValue _globally_windowed_value
@@ -35,28 +34,14 @@ cdef class ConsumerSet(Receiver):
   cdef public output_index
   cdef public coder
 
+  cpdef receive(self, WindowedValue windowed_value)
   cpdef update_counters_start(self, WindowedValue windowed_value)
   cpdef update_counters_finish(self)
-  cpdef update_counters_batch(self, WindowedBatch windowed_batch)
-
-cdef class SingletonElementConsumerSet(ConsumerSet):
-  cdef Operation consumer
 
-  cpdef receive(self, WindowedValue windowed_value)
-  cpdef receive_batch(self, WindowedBatch windowed_batch)
-  cpdef flush(self)
 
-cdef class GeneralPurposeConsumerSet(ConsumerSet):
-  cdef list element_consumers
-  cdef list passthrough_batch_consumers
-  cdef dict other_batch_consumers
-  cdef bint has_batch_consumers
-  cdef list _batched_elements
-  cdef object producer_batch_converter
+cdef class SingletonConsumerSet(ConsumerSet):
+  cdef Operation consumer
 
-  cpdef receive(self, WindowedValue windowed_value)
-  cpdef receive_batch(self, WindowedBatch windowed_batch)
-  cpdef flush(self)
 
 cdef class Operation(object):
   cdef readonly name_context
@@ -111,7 +96,6 @@ cdef class DoOperation(Operation):
   cdef public dict timer_inputs
   cdef dict timer_specs
   cdef public object input_info
-  cdef object fn
 
 
 cdef class SdfProcessSizedElements(DoOperation):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index f46a4a18369..3464c5750c5 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -26,7 +26,6 @@
 import collections
 import logging
 import threading
-import warnings
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import DefaultDict
@@ -61,8 +60,6 @@ from apache_beam.transforms import window
 from apache_beam.transforms.combiners import PhasedCombineFnExecutor
 from apache_beam.transforms.combiners import curry_combine_fn
 from apache_beam.transforms.window import GlobalWindows
-from apache_beam.typehints.batch import BatchConverter
-from apache_beam.utils.windowed_value import WindowedBatch
 from apache_beam.utils.windowed_value import WindowedValue
 
 if TYPE_CHECKING:
@@ -107,56 +104,53 @@ class ConsumerSet(Receiver):
              output_index,
              consumers,  # type: List[Operation]
              coder,
-             producer_type_hints,
-             producer_batch_converter, # type: Optional[BatchConverter]
+             producer_type_hints
              ):
     # type: (...) -> ConsumerSet
     if len(consumers) == 1:
-      consumer = consumers[0]
-
-      consumer_batch_preference = consumer.get_batching_preference()
-      consumer_batch_converter = consumer.get_input_batch_converter()
-      if (not consumer_batch_preference.supports_batches and
-          producer_batch_converter is None and
-          consumer_batch_converter is None):
-        return SingletonElementConsumerSet(
-            counter_factory,
-            step_name,
-            output_index,
-            consumer,
-            coder,
-            producer_type_hints)
-
-    return GeneralPurposeConsumerSet(
-        counter_factory,
-        step_name,
-        output_index,
-        coder,
-        producer_type_hints,
-        consumers,
-        producer_batch_converter)
+      return SingletonConsumerSet(
+          counter_factory,
+          step_name,
+          output_index,
+          consumers,
+          coder,
+          producer_type_hints)
+    else:
+      return ConsumerSet(
+          counter_factory,
+          step_name,
+          output_index,
+          consumers,
+          coder,
+          producer_type_hints)
 
   def __init__(self,
                counter_factory,
                step_name,  # type: str
                output_index,
-               consumers,
+               consumers,  # type: List[Operation]
                coder,
-               producer_type_hints,
-               producer_batch_converter
+               producer_type_hints
                ):
+    self.consumers = consumers
+
     self.opcounter = opcounters.OperationCounters(
         counter_factory,
         step_name,
         coder,
         output_index,
-        producer_type_hints=producer_type_hints,
-        producer_batch_converter=producer_batch_converter)
+        producer_type_hints=producer_type_hints)
     # Used in repr.
     self.step_name = step_name
     self.output_index = output_index
     self.coder = coder
-    self.consumers = consumers
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+    for consumer in self.consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+    self.update_counters_finish()
 
   def try_split(self, fraction_of_remainder):
     # type: (...) -> Optional[Any]
@@ -187,10 +181,6 @@ class ConsumerSet(Receiver):
     # type: () -> None
     self.opcounter.update_collect()
 
-  def update_counters_batch(self, windowed_batch):
-    # type: (WindowedBatch) -> None
-    self.opcounter.update_from_batch(windowed_batch)
-
   def __repr__(self):
     return '%s[%s.out%s, coder=%s, len(consumers)=%s]' % (
         self.__class__.__name__,
@@ -200,25 +190,24 @@ class ConsumerSet(Receiver):
         len(self.consumers))
 
 
-class SingletonElementConsumerSet(ConsumerSet):
-  """ConsumerSet representing a single consumer that can only process elements
-  (not batches)."""
+class SingletonConsumerSet(ConsumerSet):
   def __init__(self,
                counter_factory,
                step_name,
                output_index,
-               consumer,  # type: Operation
+               consumers,  # type: List[Operation]
                coder,
                producer_type_hints
                ):
-    super().__init__(
+    assert len(consumers) == 1
+    super(SingletonConsumerSet, self).__init__(
         counter_factory,
         step_name,
-        output_index, [consumer],
+        output_index,
+        consumers,
         coder,
-        producer_type_hints,
-        None)
-    self.consumer = consumer
+        producer_type_hints)
+    self.consumer = consumers[0]
 
   def receive(self, windowed_value):
     # type: (WindowedValue) -> None
@@ -226,14 +215,6 @@ class SingletonElementConsumerSet(ConsumerSet):
     self.consumer.process(windowed_value)
     self.update_counters_finish()
 
-  def receive_batch(self, windowed_batch):
-    raise AssertionError(
-        "SingletonElementConsumerSet.receive_batch is not implemented")
-
-  def flush(self):
-    # SingletonElementConsumerSet has no buffer to flush
-    pass
-
   def try_split(self, fraction_of_remainder):
     # type: (...) -> Optional[Any]
     return self.consumer.try_split(fraction_of_remainder)
@@ -242,133 +223,6 @@ class SingletonElementConsumerSet(ConsumerSet):
     return self.consumer.current_element_progress()
 
 
-class GeneralPurposeConsumerSet(ConsumerSet):
-  """ConsumerSet implementation that handles all combinations of possible edges.
-  """
-  MAX_BATCH_SIZE = 4096
-
-  def __init__(self,
-               counter_factory,
-               step_name,  # type: str
-               output_index,
-               coder,
-               producer_type_hints,
-               consumers,  # type: List[Operation]
-               producer_batch_converter):
-    super().__init__(
-        counter_factory,
-        step_name,
-        output_index,
-        consumers,
-        coder,
-        producer_type_hints,
-        producer_batch_converter)
-
-    self.producer_batch_converter = producer_batch_converter
-
-    # Partition consumers into three groups:
-    # - consumers that will be passed elements
-    # - consumers that will be passed batches (where their input batch type
-    #   matches the output of the producer)
-    # - consumers that will be passed converted batches
-    self.element_consumers: List[Operation] = []
-    self.passthrough_batch_consumers: List[Operation] = []
-    other_batch_consumers: DefaultDict[
-        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
-
-    for consumer in consumers:
-      if not consumer.get_batching_preference().supports_batches:
-        self.element_consumers.append(consumer)
-      elif (consumer.get_input_batch_converter() ==
-            self.producer_batch_converter):
-        self.passthrough_batch_consumers.append(consumer)
-      else:
-        # Batch consumer with a mismatched batch type
-        if consumer.get_batching_preference().supports_elements:
-          # Pass it elements if we can
-          self.element_consumers.append(consumer)
-        else:
-          # As a last resort, explode and rebatch
-          consumer_batch_converter = consumer.get_input_batch_converter()
-          # This consumer supports batches, it must have a batch converter
-          assert consumer_batch_converter is not None
-          other_batch_consumers[consumer_batch_converter].append(consumer)
-
-    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
-        other_batch_consumers)
-
-    self.has_batch_consumers = (
-        self.passthrough_batch_consumers or self.other_batch_consumers)
-    self._batched_elements: List[Any] = []
-
-  def receive(self, windowed_value):
-    # type: (WindowedValue) -> None
-
-    self.update_counters_start(windowed_value)
-
-    for consumer in self.element_consumers:
-      cython.cast(Operation, consumer).process(windowed_value)
-
-    # TODO: Do this branching when contstructing ConsumerSet
-    if self.has_batch_consumers:
-      self._batched_elements.append(windowed_value)
-      if len(self._batched_elements) >= self.MAX_BATCH_SIZE:
-        self.flush()
-
-    # TODO(BEAM-14408): Properly estimate sizes in the batch-consumer only case,
-    # this undercounts large iterables
-    self.update_counters_finish()
-
-  def receive_batch(self, windowed_batch):
-    if self.element_consumers:
-      for wv in windowed_batch.as_windowed_values(
-          self.producer_batch_converter.explode_batch):
-        for consumer in self.element_consumers:
-          cython.cast(Operation, consumer).process(wv)
-
-    for consumer in self.passthrough_batch_consumers:
-      cython.cast(Operation, consumer).process_batch(windowed_batch)
-
-    for (consumer_batch_converter,
-         consumers) in self.other_batch_consumers.items():
-      # Explode and rebatch into the new batch type (ouch!)
-      # TODO: Register direct conversions for equivalent batch types
-
-      for consumer in consumers:
-        warnings.warn(
-            f"Input to operation {consumer} must be rebatched from type "
-            f"{self.producer_batch_converter.batch_type!r} to "
-            f"{consumer_batch_converter.batch_type!r}.\n"
-            "This is very inefficient, consider re-structuring your pipeline "
-            "or adding a DoFn to directly convert between these types.",
-            InefficientExecutionWarning)
-        cython.cast(Operation, consumer).process_batch(
-            windowed_batch.with_values(
-                consumer_batch_converter.produce_batch(
-                    self.producer_batch_converter.explode_batch(
-                        windowed_batch.values))))
-
-    self.update_counters_batch(windowed_batch)
-
-  def flush(self):
-    if not self.has_batch_consumers or not self._batched_elements:
-      return
-
-    for batch_converter, consumers in self.other_batch_consumers.items():
-      for windowed_batch in WindowedBatch.from_windowed_values(
-          self._batched_elements, produce_fn=batch_converter.produce_batch):
-        for consumer in consumers:
-          cython.cast(Operation, consumer).process_batch(windowed_batch)
-
-    for consumer in self.passthrough_batch_consumers:
-      for windowed_batch in WindowedBatch.from_windowed_values(
-          self._batched_elements,
-          produce_fn=self.producer_batch_converter.produce_batch):
-        cython.cast(Operation, consumer).process_batch(windowed_batch)
-
-    self._batched_elements = []
-
-
 class Operation(object):
   """An operation representing the live version of a work item specification.
 
@@ -438,9 +292,7 @@ class Operation(object):
                 i,
                 self.consumers[i],
                 coder,
-                self._get_runtime_performance_hints(),
-                self.get_output_batch_converter(),
-            ) for i,
+                self._get_runtime_performance_hints()) for i,
             coder in enumerate(self.spec.output_coders)
         ]
     self.setup_done = True
@@ -453,29 +305,12 @@ class Operation(object):
       # For legacy workers.
       self.setup()
 
-  def get_batching_preference(self):
-    # By default operations don't support batching, require Receiver to unbatch
-    return common.BatchingPreference.BATCH_FORBIDDEN
-
-  def get_input_batch_converter(self) -> Optional[BatchConverter]:
-    """Returns a batch type converter if this operation can accept a batch,
-    otherwise None."""
-    return None
-
-  def get_output_batch_converter(self) -> Optional[BatchConverter]:
-    """Returns a batch type converter if this operation can produce a batch,
-    otherwise None."""
-    return None
-
   def process(self, o):
     # type: (WindowedValue) -> None
 
     """Process element in operation."""
     pass
 
-  def process_batch(self, batch: WindowedBatch):
-    pass
-
   def finalize_bundle(self):
     # type: () -> None
     pass
@@ -494,8 +329,7 @@ class Operation(object):
     # type: () -> None
 
     """Finish operation."""
-    for receiver in self.receivers:
-      cython.cast(Receiver, receiver).flush()
+    pass
 
   def teardown(self):
     # type: () -> None
@@ -678,8 +512,7 @@ class ImpulseReadOperation(Operation):
             0,
             next(iter(consumers.values())),
             output_coder,
-            self._get_runtime_performance_hints(),
-            self.get_output_batch_converter())
+            self._get_runtime_performance_hints())
     ]
 
   def process(self, unused_impulse):
@@ -711,8 +544,8 @@ class _TaggedReceivers(dict):
     self._step_name = step_name
 
   def __missing__(self, tag):
-    self[tag] = receiver = ConsumerSet.create(
-        self._counter_factory, self._step_name, tag, [], None, None, None)
+    self[tag] = receiver = ConsumerSet(
+        self._counter_factory, self._step_name, tag, [], None, None)
     return receiver
 
   def total_output_bytes(self):
@@ -754,10 +587,6 @@ class DoOperation(Operation):
     # A mapping of timer tags to the input "PCollections" they come in on.
     self.input_info = None  # type: Optional[OpInputInfo]
 
-    # See fn_data in dataflow_runner.py
-    # TODO: Store all the items from spec?
-    self.fn, _, _, _, _ = (pickler.loads(self.spec.serialized_fn))
-
   def _read_side_inputs(self, tags_and_types):
     # type: (...) -> Iterator[apache_sideinputs.SideInputMap]
 
@@ -873,21 +702,6 @@ class DoOperation(Operation):
       super(DoOperation, self).start()
       self.dofn_runner.start()
 
-  def get_batching_preference(self):
-    if self.fn.process_batch_defined:
-      if self.fn.process_defined:
-        return common.BatchingPreference.DO_NOT_CARE
-      else:
-        return common.BatchingPreference.BATCH_REQUIRED
-    else:
-      return common.BatchingPreference.BATCH_FORBIDDEN
-
-  def get_input_batch_converter(self) -> Optional[BatchConverter]:
-    return getattr(self.fn, 'input_batch_converter', None)
-
-  def get_output_batch_converter(self) -> Optional[BatchConverter]:
-    return getattr(self.fn, 'output_batch_converter', None)
-
   def process(self, o):
     # type: (WindowedValue) -> None
     with self.scoped_process_state:
@@ -898,9 +712,6 @@ class DoOperation(Operation):
           self.execution_context.delayed_applications.append(
               (self, delayed_application))
 
-  def process_batch(self, windowed_batch: WindowedBatch) -> None:
-    self.dofn_runner.process_batch(windowed_batch)
-
   def finalize_bundle(self):
     # type: () -> None
     self.dofn_runner.finalize()
@@ -924,7 +735,6 @@ class DoOperation(Operation):
 
   def finish(self):
     # type: () -> None
-    super(DoOperation, self).finish()
     with self.scoped_finish_state:
       self.dofn_runner.finish()
       if self.user_state_context:
@@ -1111,7 +921,6 @@ class CombineOperation(Operation):
   def finish(self):
     # type: () -> None
     _LOGGER.debug('Finishing %s', self)
-    super(CombineOperation, self).finish()
 
   def teardown(self):
     # type: () -> None
@@ -1157,7 +966,6 @@ class PGBKOperation(Operation):
   def finish(self):
     # type: () -> None
     self.flush(0)
-    super().finish()
 
   def flush(self, target):
     # type: (int) -> None
@@ -1462,11 +1270,3 @@ class SimpleMapTaskExecutor(object):
       op.start()
     for op in self._ops:
       op.finish()
-
-
-class InefficientExecutionWarning(RuntimeWarning):
-  """warning to indicate an inefficiency in a Beam pipeline."""
-
-
-# Don't ignore InefficientExecutionWarning, but only log them once
-warnings.simplefilter('once', InefficientExecutionWarning)
diff --git a/sdks/python/apache_beam/transforms/batch_dofn_test.py b/sdks/python/apache_beam/transforms/batch_dofn_test.py
index f1fc7eda093..5f05e371ec6 100644
--- a/sdks/python/apache_beam/transforms/batch_dofn_test.py
+++ b/sdks/python/apache_beam/transforms/batch_dofn_test.py
@@ -22,7 +22,6 @@
 import unittest
 from typing import Iterator
 from typing import List
-from typing import no_type_check
 
 from parameterized import parameterized_class
 
@@ -128,19 +127,6 @@ class BatchDoFnTest(unittest.TestCase):
                                 r'BatchDoFnNoInputAnnotation.process_batch'):
       _ = pc | beam.ParDo(BatchDoFnNoInputAnnotation())
 
-  def test_unsupported_dofn_param_raises(self):
-    class BatchDoFnBadParam(beam.DoFn):
-      @no_type_check
-      def process_batch(self, batch: List[int], key=beam.DoFn.KeyParam):
-        yield batch * key
-
-    p = beam.Pipeline()
-    pc = p | beam.Create([1, 2, 3])
-
-    with self.assertRaisesRegex(NotImplementedError,
-                                r'.*BatchDoFnBadParam.*KeyParam'):
-      _ = pc | beam.ParDo(BatchDoFnBadParam())
-
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index d4fdfb14c18..a22b408378e 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -33,8 +33,6 @@ from typing import Tuple
 from typing import TypeVar
 from typing import Union
 
-import numpy as np
-
 from apache_beam import typehints
 from apache_beam.transforms import core
 from apache_beam.transforms import cy_combiners
@@ -90,7 +88,7 @@ class Mean(object):
 
 # TODO(laolu): This type signature is overly restrictive. This should be
 # more general.
-@with_input_types(Union[float, int, np.int64, np.float64])
+@with_input_types(Union[float, int])
 @with_output_types(float)
 class MeanCombineFn(core.CombineFn):
   """CombineFn for computing an arithmetic mean."""
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index f3ecd00cccc..191ba8a1f76 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -2045,7 +2045,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     expected_msg = \
       "Type hint violation for 'CombinePerKey': " \
-      "requires Tuple[TypeVariable[K], Union[float, float64, int, int64]] " \
+      "requires Tuple[TypeVariable[K], Union[float, int]] " \
       "but got Tuple[None, str] for element"
 
     self.assertStartswith(e.exception.args[0], expected_msg)
@@ -2111,7 +2111,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     expected_msg = \
       "Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
-      "requires Tuple[TypeVariable[K], Union[float, float64, int, int64]] " \
+      "requires Tuple[TypeVariable[K], Union[float, int]] " \
       "but got Tuple[str, str] for element"
 
     self.assertStartswith(e.exception.args[0], expected_msg)
@@ -2151,8 +2151,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       "Runtime type violation detected within " \
       "OddMean/CombinePerKey(MeanCombineFn): " \
       "Type-hint for argument: 'element' violated: " \
-      "Union[float, float64, int, int64] type-constraint violated. " \
-      "Expected an instance of one of: ('float', 'float64', 'int', 'int64'), " \
+      "Union[float, int] type-constraint violated. " \
+      "Expected an instance of one of: ('float', 'int'), " \
       "received str instead"
 
     self.assertStartswith(e.exception.args[0], expected_msg)
diff --git a/sdks/python/apache_beam/typehints/batch.py b/sdks/python/apache_beam/typehints/batch.py
index 47294bb1f6e..3fcf79ec45a 100644
--- a/sdks/python/apache_beam/typehints/batch.py
+++ b/sdks/python/apache_beam/typehints/batch.py
@@ -226,12 +226,6 @@ class NumpyTypeHint():
     def __hash__(self) -> int:
       return hash(self.__key())
 
-    def __repr__(self):
-      if self.shape == (N, ):
-        return f'NumpyArray[{self.dtype!r}]'
-      else:
-        return f'NumpyArray[{self.dtype!r}, {self.shape!r}]'
-
   def __getitem__(self, value):
     if isinstance(value, tuple):
       if len(value) == 2:
diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd b/sdks/python/apache_beam/utils/windowed_value.pxd
index 91e36789d69..5d867c83c38 100644
--- a/sdks/python/apache_beam/utils/windowed_value.pxd
+++ b/sdks/python/apache_beam/utils/windowed_value.pxd
@@ -43,14 +43,6 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
-cdef class WindowedBatch(object):
-  cpdef WindowedBatch with_values(self, object new_values)
-
-cdef class HomogeneousWindowedBatch(WindowedBatch):
-  cdef public WindowedValue _wv
-
-  cpdef WindowedBatch with_values(self, object new_values)
-
 @cython.locals(wv=WindowedValue)
 cpdef WindowedValue create(
   object value, int64_t timestamp_micros, object windows, object pane_info=*)
diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py
index d80becb41c0..08fca45c31c 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -30,14 +30,10 @@ This module is experimental. No backwards-compatibility guarantees.
 
 # pytype: skip-file
 
-import collections
 from typing import TYPE_CHECKING
 from typing import Any
-from typing import Callable
-from typing import Iterable
 from typing import List
 from typing import Optional
-from typing import Sequence
 from typing import Tuple
 
 from apache_beam.utils.timestamp import MAX_TIMESTAMP
@@ -150,14 +146,10 @@ class PaneInfo(object):
   def __eq__(self, other):
     if self is other:
       return True
-
-    if isinstance(other, PaneInfo):
-      return (
-          self.is_first == other.is_first and self.is_last == other.is_last and
-          self.timing == other.timing and self.index == other.index and
-          self.nonspeculative_index == other.nonspeculative_index)
-
-    return NotImplemented
+    return (
+        self.is_first == other.is_first and self.is_last == other.is_last and
+        self.timing == other.timing and self.index == other.index and
+        self.nonspeculative_index == other.nonspeculative_index)
 
   def __hash__(self):
     return hash((
@@ -211,13 +203,13 @@ class WindowedValue(object):
       the pane that contained this value.  If None, will be set to
       PANE_INFO_UNKNOWN.
   """
-  def __init__(
-      self,
-      value,
-      timestamp,  # type: TimestampTypes
-      windows,  # type: Tuple[BoundedWindow, ...]
-      pane_info=PANE_INFO_UNKNOWN  # type: PaneInfo
-  ):
+
+  def __init__(self,
+               value,
+               timestamp,  # type: TimestampTypes
+               windows,  # type: Tuple[BoundedWindow, ...]
+               pane_info=PANE_INFO_UNKNOWN  # type: PaneInfo
+              ):
     # type: (...) -> None
     # For performance reasons, only timestamp_micros is stored by default
     # (as a C int). The Timestamp object is created on demand below.
@@ -250,18 +242,16 @@ class WindowedValue(object):
         self.pane_info)
 
   def __eq__(self, other):
-    if isinstance(other, WindowedValue):
-      return (
-          type(self) == type(other) and
-          self.timestamp_micros == other.timestamp_micros and
-          self.value == other.value and self.windows == other.windows and
-          self.pane_info == other.pane_info)
-    return NotImplemented
+    return (
+        type(self) == type(other) and
+        self.timestamp_micros == other.timestamp_micros and
+        self.value == other.value and self.windows == other.windows and
+        self.pane_info == other.pane_info)
 
   def __hash__(self):
     return ((hash(self.value) & 0xFFFFFFFFFFFFFFF) + 3 *
             (self.timestamp_micros & 0xFFFFFFFFFFFFFF) + 7 *
-            (hash(tuple(self.windows)) & 0xFFFFFFFFFFFFF) + 11 *
+            (hash(self.windows) & 0xFFFFFFFFFFFFF) + 11 *
             (hash(self.pane_info) & 0xFFFFFFFFFFFFF))
 
   def with_value(self, new_value):
@@ -280,8 +270,6 @@ class WindowedValue(object):
 
 
 # TODO(robertwb): Move this to a static method.
-
-
 def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   wv = WindowedValue.__new__(WindowedValue)
   wv.value = value
@@ -291,89 +279,6 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
-class WindowedBatch(object):
-  """A batch of N windowed values, each having a value, a timestamp and set of
-  windows."""
-  def with_values(self, new_values):
-    # type: (Any) -> WindowedBatch
-
-    """Creates a new WindowedBatch with the same timestamps and windows as this.
-
-    This is the fasted way to create a new WindowedValue.
-    """
-    raise NotImplementedError
-
-  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
-    raise NotImplementedError
-
-  @staticmethod
-  def from_windowed_values(
-      windowed_values: Sequence[WindowedValue], *,
-      produce_fn: Callable) -> Iterable['WindowedBatch']:
-    return HomogeneousWindowedBatch.from_windowed_values(
-        windowed_values, produce_fn=produce_fn)
-
-
-class HomogeneousWindowedBatch(WindowedBatch):
-  """A WindowedBatch with Homogeneous event-time information, represented
-  internally as a WindowedValue.
-  """
-  def __init__(self, wv):
-    self._wv = wv
-
-  @staticmethod
-  def of(values, timestamp, windows, pane_info):
-    return HomogeneousWindowedBatch(
-        WindowedValue(values, timestamp, windows, pane_info))
-
-  @property
-  def values(self):
-    return self._wv.value
-
-  @property
-  def timestamp(self):
-    return self._wv.timestamp
-
-  @property
-  def pane_info(self):
-    return self._wv.pane_info
-
-  @property
-  def windows(self):
-    return self._wv.windows
-
-  @windows.setter
-  def windows(self, value):
-    self._wv.windows = value
-
-  def with_values(self, new_values):
-    # type: (Any) -> WindowedBatch
-    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
-
-  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
-    for value in explode_fn(self._wv.value):
-      yield self._wv.with_value(value)
-
-  def __eq__(self, other):
-    if isinstance(other, HomogeneousWindowedBatch):
-      return self._wv == other._wv
-    return NotImplemented
-
-  def __hash__(self):
-    return hash(self._wv)
-
-  @staticmethod
-  def from_windowed_values(
-      windowed_values: Sequence[WindowedValue], *,
-      produce_fn: Callable) -> Iterable['WindowedBatch']:
-    grouped = collections.defaultdict(lambda: [])
-    for wv in windowed_values:
-      grouped[wv.with_value(None)].append(wv.value)
-
-    for key, values in grouped.items():
-      yield HomogeneousWindowedBatch(key.with_value(produce_fn(values)))
-
-
 try:
   WindowedValue.timestamp_object = None
 except TypeError:
diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py b/sdks/python/apache_beam/utils/windowed_value_test.py
index 1e4892aa9bd..bf4048a9bd0 100644
--- a/sdks/python/apache_beam/utils/windowed_value_test.py
+++ b/sdks/python/apache_beam/utils/windowed_value_test.py
@@ -20,13 +20,9 @@
 # pytype: skip-file
 
 import copy
-import itertools
 import pickle
 import unittest
 
-from parameterized import parameterized
-from parameterized import parameterized_class
-
 from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import Timestamp
 
@@ -76,93 +72,5 @@ class WindowedValueTest(unittest.TestCase):
     self.assertTrue(pickle.loads(pickle.dumps(wv)) == wv)
 
 
-WINDOWED_BATCH_INSTANCES = [
-    windowed_value.HomogeneousWindowedBatch.of(
-        None, 3, (), windowed_value.PANE_INFO_UNKNOWN),
-    windowed_value.HomogeneousWindowedBatch.of(
-        None,
-        3, (),
-        windowed_value.PaneInfo(
-            True, False, windowed_value.PaneInfoTiming.ON_TIME, 0, 0)),
-]
-
-
-class WindowedBatchTest(unittest.TestCase):
-  def test_homogeneous_windowed_batch_with_values(self):
-    pane_info = windowed_value.PaneInfo(
-        True, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 0)
-    wb = windowed_value.HomogeneousWindowedBatch.of(['foo', 'bar'],
-                                                    6, (),
-                                                    pane_info)
-    self.assertEqual(
-        wb.with_values(['baz', 'foo']),
-        windowed_value.HomogeneousWindowedBatch.of(['baz', 'foo'],
-                                                   6, (),
-                                                   pane_info))
-
-  def test_homogeneous_windowed_batch_as_windowed_values(self):
-    pane_info = windowed_value.PaneInfo(
-        True, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 0)
-    wb = windowed_value.HomogeneousWindowedBatch.of(['foo', 'bar'],
-                                                    3, (),
-                                                    pane_info)
-
-    self.assertEqual(
-        list(wb.as_windowed_values(iter)),
-        [
-            windowed_value.WindowedValue('foo', 3, (), pane_info),
-            windowed_value.WindowedValue('bar', 3, (), pane_info)
-        ])
-
-  @parameterized.expand(itertools.combinations(WINDOWED_BATCH_INSTANCES, 2))
-  def test_inequality(self, left_wb, right_wb):
-    self.assertNotEqual(left_wb, right_wb)
-
-  def test_equals_different_type(self):
-    wb = windowed_value.HomogeneousWindowedBatch.of(
-        None, 3, (), windowed_value.PANE_INFO_UNKNOWN)
-    self.assertNotEqual(wb, object())
-
-  def test_homogeneous_from_windowed_values(self):
-    pane_info = windowed_value.PaneInfo(
-        True, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 0)
-
-    windowed_values = [
-        windowed_value.WindowedValue('foofoo', 3, (), pane_info),
-        windowed_value.WindowedValue('foobar', 6, (), pane_info),
-        windowed_value.WindowedValue('foobaz', 9, (), pane_info),
-        windowed_value.WindowedValue('barfoo', 3, (), pane_info),
-        windowed_value.WindowedValue('barbar', 6, (), pane_info),
-        windowed_value.WindowedValue('barbaz', 9, (), pane_info),
-        windowed_value.WindowedValue('bazfoo', 3, (), pane_info),
-        windowed_value.WindowedValue('bazbar', 6, (), pane_info),
-        windowed_value.WindowedValue('bazbaz', 9, (), pane_info),
-    ]
-
-    self.assertEqual(
-        list(
-            windowed_value.WindowedBatch.from_windowed_values(
-                windowed_values, produce_fn=list)),
-        [
-            windowed_value.HomogeneousWindowedBatch.of(
-                ['foofoo', 'barfoo', 'bazfoo'], 3, (), pane_info),
-            windowed_value.HomogeneousWindowedBatch.of(
-                ['foobar', 'barbar', 'bazbar'], 6, (), pane_info),
-            windowed_value.HomogeneousWindowedBatch.of(
-                ['foobaz', 'barbaz', 'bazbaz'], 9, (), pane_info)
-        ])
-
-
-@parameterized_class(('wb', ), [(wb, ) for wb in WINDOWED_BATCH_INSTANCES])
-class WindowedBatchUtilitiesTest(unittest.TestCase):
-  def test_hash(self):
-    wb_copy = copy.copy(self.wb)
-    self.assertFalse(self.wb is wb_copy)
-    self.assertEqual({self.wb: 100}.get(wb_copy), 100)
-
-  def test_pickle(self):
-    self.assertTrue(pickle.loads(pickle.dumps(self.wb)) == self.wb)
-
-
 if __name__ == '__main__':
   unittest.main()