You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/17 01:50:37 UTC

[GitHub] [beam] tvalentyn commented on a diff in pull request #23931: Change Point Analysis

tvalentyn commented on code in PR #23931:
URL: https://github.com/apache/beam/pull/23931#discussion_r1021803276


##########
sdks/python/apache_beam/testing/analyzers/analysis_test.py:
##########
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import unittest
+
+import mock
+import pandas as pd
+
+try:
+  from apache_beam.testing.analyzers import analysis
+except ImportError as e:
+  analysis = None
+
+
+class IgnoreChangePointObject:
+  change_point_sibling_distance = 2
+  changepoint_to_recent_run_window = 2
+
+
+def FakeData(query_template):

Review Comment:
   use snake_case for functions.



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'
+BQ_DATASET = 'beam_perf_storage'
+OWNER = 'apache'
+REPO = 'beam'
+
+ID_LABEL = 'test_id'

Review Comment:
   is this an ID of an execution(of a particular test run)?



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'
+BQ_DATASET = 'beam_perf_storage'
+OWNER = 'apache'
+REPO = 'beam'
+
+ID_LABEL = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10

Review Comment:
    one option (up to you), is to just use this number as total # of issues to display, with the assumption that changepoint is in the middle, then you don't need to explain that its `# from change point index in both directions`



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'
+BQ_DATASET = 'beam_perf_storage'
+OWNER = 'apache'
+REPO = 'beam'
+
+ID_LABEL = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+_LOGGER = logging.getLogger(__name__)
+
+SCHEMA = [{
+    'name': ID_LABEL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']

Review Comment:
   ```suggestion
   ISSUE_LABELS = ['perf-alert']
   ```



##########
sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py:
##########
@@ -616,3 +620,37 @@ def __init__(self):
   def process(self, element):
     yield self.timestamp_val_fn(
         element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))
+
+
+class FetchMetrics:

Review Comment:
   this class have any state, the name FetchMetrics denotes an action, while the class has a separate static method for that action.
   
   I propose to  make the method into a function.
   
   We could use a class hierarchy like:
   
   ```
   class MetricsFetcher:
      def get_metrics():
          return NotImplemented
   ```
   
   and then have:
   ```
   class BigQueryMetricsFetcher(MetricsFetcher):
      ...
   class InfluxDBMetricsFetcher(MetricsFetcher):
      ...
   ```
   
   but we can add that later when it becomes needed instead of overdesigning. In python one can also pass functions/callable as arguments if necessary.
   
       



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None

Review Comment:
   raising a runtime error is more appropriate than an assertion in this case. see: https://softwareengineering.stackexchange.com/questions/15515/when-to-use-assertions-and-when-to-use-exceptions
   
   You can raise a ValueError if params['source'] != 'big_query':



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.
+    # window - Number of runs between the
+    # change_point_to_recent_run_window run and the most recent run.
+    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
+                                           change_point_index):
+      # change point lies outside the window from the recent run.
+      # Ignore this change point.
+      logging.info(
+          'Performance regression found for the test: %s. '
+          'but not creating an alert since the Change Point '
+          'lies outside the '
+          'change_point_to_recent_run_window distance' % test_name)
+      continue
+
+    # check for sibling change point. Sibling change point is a change
+    # point that lies in the distance of change_point_sibling_distance
+    # in both directions from the current change point index.
+    # Here, distance can be interpreted as number of runs between two change
+    # points. The idea here is that sibling change point will also point to
+    # the same performance regression.
+
+    create_alert, last_created_issue_number = (
+      has_sibling_change_point(
+        change_point_index=change_point_index,
+        change_point_sibling_distance=change_point_sibling_distance,
+        metric_values=metric_values,
+        metric_name=metric_name,
+        test_name=test_name,
+        change_point_timestamp=change_point_timestamp
+      )
+    )
+
+    logging.info(

Review Comment:
   move inside `if` ?



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(

Review Comment:
   >   Finds the sibling change point index. If not,
   >    returns the original change point index.
   
   This is not an intuitive behavior for a method called `has_sibling_change_point`.
   
   For a method called `has_sibling_change_point`, I'd expect the result being something like True/False depending on whether the changepoint has a sibling or doesn't.
   
   



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:

Review Comment:
   It seems unneccessary to iterate over metric_values to determine if a reported changepoint is in the list.
   Can we analyze the timestamp?
   I assume data in metric_values is ordered by submission timestamp. If not , we need to query it so that it is ordered, in order for change point detection to make sense.
    
   if data is ordered by the timestamp, we can identify the  timestamps for ends of the allowed  changepoint window. if the reported changepoint is outside of that window, that we have a new changepoint.    



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'
+BQ_DATASET = 'beam_perf_storage'
+OWNER = 'apache'

Review Comment:
   owner of what? can this constant be more descriptive?



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'

Review Comment:
   constants internal/private to the module should be prefixed with underscore, e.g. _BQ_PROJECT_NAME



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,

Review Comment:
   nit: this looks pretty confusing:
   
   ```
         metric_name_id=METRIC_NAME,
         metric_name=metric_name,
   ```
   
   I think the string would be easier to read if you write it as f-string, such as: 
   ```
   query_template = (
            ...
            f"WHERE {METRIC_NAME} = '{metric_name}'" 
            ...
   ```



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'
+BQ_DATASET = 'beam_perf_storage'
+OWNER = 'apache'
+REPO = 'beam'
+
+ID_LABEL = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+_LOGGER = logging.getLogger(__name__)
+
+SCHEMA = [{
+    'name': ID_LABEL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.

Review Comment:
   i would use median instead of mean. 



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:

Review Comment:
   it looks like args is a single name for a config file. in this case, let's pass:
   
   ```
   run(config_file_path)
   ```



##########
sdks/python/apache_beam/testing/analyzers/tests_config.yaml:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# NOTE:
+# change_point_sibling_distance: units for this variable is number of runs.
+# So if a test runs 2 times a day, and we want to look for sibling change point in
+# the runs that happened 3 days before and after the change point,
+# then this value will should be 3 * 2 = 6.
+
+# change_point_to_recent_run_window: same units as change_point_sibling_distance.
+
+test_1:
+  test_name: apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks_1
+  source: big_query
+  metrics_dataset: beam_run_inference
+  metrics_table: torch_inference_imagenet_results_resnet152
+  project: apache-beam-testing
+  metric_name: mean_load_model_latency_milli_secs
+  labels:
+    - perf_alert

Review Comment:
   re labels:
   perf-alert (- instead of _), also let's add:
   run-inference



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+
+
+def create_or_comment_issue(
+    title: str,
+    description: str,
+    labels: Optional[List] = None,
+    issue_number: Optional[int] = None) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+  If an issue is already created and is open,
+  then comment on the issue instead of creating a duplicate issue.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+    issue_number: GitHub issue number used to find the already created issue.
+  """
+  if issue_number:
+    commented_on_issue, comment_url = comment_on_issue(
+      issue_number=issue_number,
+      comment_description=description)
+    if commented_on_issue:
+      return issue_number, comment_url
+
+  # Issue number was not provided or issue with provided number
+  # is closed. In that case, create a new issue.
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME)
+  data = {
+      'owner': _BEAM_REPO_OWNER,
+      'repo': _BEAM_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_REPO_OWNER,
+          'repo': _BEAM_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS)
+  status_code = open_issue_response.status_code
+  open_issue_response = open_issue_response.json()
+  if status_code == 200 and open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_REPO_OWNER,
+        'repo': _BEAM_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(

Review Comment:
   one suggestion on the design also was to re-apply an `awaiting triage` label when we are bumping up an existing issue.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,

Review Comment:
   Let's move the comment on config structure to the beginning of the config file or README.



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'
+BQ_DATASET = 'beam_perf_storage'
+OWNER = 'apache'
+REPO = 'beam'
+
+ID_LABEL = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+_LOGGER = logging.getLogger(__name__)
+
+SCHEMA = [{
+    'name': ID_LABEL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']

Review Comment:
   is it possible to configure labels for individual tests in the config file? for example: python tests will add `python` label, and tests can also add `io` label, etc.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was

Review Comment:
   run() methods should'n make any assumptions or documentation where the config file is taken from. as far as this docstring is concerned, config file is a parameter to this function.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(

Review Comment:
   we should probably fetch last X runs here. 



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+
+
+def create_or_comment_issue(
+    title: str,
+    description: str,
+    labels: Optional[List] = None,
+    issue_number: Optional[int] = None) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+  If an issue is already created and is open,
+  then comment on the issue instead of creating a duplicate issue.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+    issue_number: GitHub issue number used to find the already created issue.
+  """
+  if issue_number:
+    commented_on_issue, comment_url = comment_on_issue(
+      issue_number=issue_number,
+      comment_description=description)
+    if commented_on_issue:
+      return issue_number, comment_url
+
+  # Issue number was not provided or issue with provided number
+  # is closed. In that case, create a new issue.
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME)
+  data = {
+      'owner': _BEAM_REPO_OWNER,
+      'repo': _BEAM_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_REPO_OWNER,
+          'repo': _BEAM_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS)
+  status_code = open_issue_response.status_code
+  open_issue_response = open_issue_response.json()
+  if status_code == 200 and open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_REPO_OWNER,
+        'repo': _BEAM_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), headers=_HEADERS)
+    return True, response.json()['html_url']
+
+  return False, None
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List,
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display, len(metric_values))

Review Comment:
   i think you need to have `change_point_index + max_results_to_display + 1` here



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.
+    # window - Number of runs between the
+    # change_point_to_recent_run_window run and the most recent run.
+    if not is_change_point_in_valid_window(change_point_to_recent_run_window,
+                                           change_point_index):
+      # change point lies outside the window from the recent run.
+      # Ignore this change point.
+      logging.info(
+          'Performance regression found for the test: %s. '
+          'but not creating an alert since the Change Point '
+          'lies outside the '
+          'change_point_to_recent_run_window distance' % test_name)
+      continue
+
+    # check for sibling change point. Sibling change point is a change
+    # point that lies in the distance of change_point_sibling_distance
+    # in both directions from the current change point index.
+    # Here, distance can be interpreted as number of runs between two change
+    # points. The idea here is that sibling change point will also point to
+    # the same performance regression.
+
+    create_alert, last_created_issue_number = (
+      has_sibling_change_point(
+        change_point_index=change_point_index,
+        change_point_sibling_distance=change_point_sibling_distance,
+        metric_values=metric_values,
+        metric_name=metric_name,
+        test_name=test_name,
+        change_point_timestamp=change_point_timestamp
+      )
+    )
+
+    logging.info(
+        "Create performance alert for the "
+        "test %s: %s" % (test_name, create_alert))
+
+    if create_alert:
+      # get the issue description for the creating GH issue or
+      # to comment on open GH issue.
+      issue_description = get_issue_description(
+          metric_name=metric_name,
+          timestamps=timestamps,
+          metric_values=metric_values,
+          change_point_index=change_point_index,
+          max_results_to_display=NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION)
+
+      issue_number, issue_url = create_or_comment_issue(
+        title=TITLE_TEMPLATE.format(test_name, metric_name),
+        description=issue_description,
+        labels=labels,
+        issue_number=last_created_issue_number
+      )
+
+      logging.info(
+          'Performance regression is alerted on issue #%s. Link to '
+          'the issue: %s' % (issue_number, issue_url))
+
+      issue_meta_data_dict = GitHubIssueMetaData(

Review Comment:
   nit: Metadata is typically spelled as one word.



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'

Review Comment:
   will we have to update this before submission?



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.
+    # window - Number of runs between the
+    # change_point_to_recent_run_window run and the most recent run.

Review Comment:
   see if suggestion above make it easier to undersand the meaning of the flag.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points

Review Comment:
   Why are we considering latest changepoints instead of earlier changepoints?



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(

Review Comment:
   for the intent that you have in the method, i'd have something like:
   
   ```
   find_existing_issue(
      test_name,
      metric_name,
      max_timestamp
   ) -> Optional[Tuple[bool, int]]
   
   """
      Finds the most recent github issue created for change points for this test+metric.
      Returns an issue ID and a boolean whether the issue needs to be updated.
      If an existing issue was last updated before max_timestamp, it needs an update.
   """
   ```



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10

Review Comment:
   constants internal to the module should be prefixed with _



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']

Review Comment:
   naming suggestion: change_point_sibling_distance -> min_runs_between_changepoints



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(

Review Comment:
   > returns the original change point index.
   
   this doesn't match with what I'm seeing. looks like the code returns an existing issue id and a boolean.



##########
sdks/python/apache_beam/testing/analyzers/analysis.py:
##########
@@ -0,0 +1,484 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import argparse
+import json
+import logging
+import os
+import time
+import uuid
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import google.api_core.exceptions
+import numpy as np
+import pandas as pd
+import requests
+import yaml
+
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+BQ_PROJECT_NAME = 'apache-beam-testing'
+BQ_DATASET = 'beam_perf_storage'
+OWNER = 'apache'
+REPO = 'beam'
+
+ID_LABEL = 'test_id'

Review Comment:
   what does _LABEL refer to?



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']

Review Comment:
   naming suggestion:
   change_point_to_recent_run_window ->num_runs_in_changepoint_window



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)

Review Comment:
   do we pass entire dataset? I think we should select select an appropriate subset of recent runs, perhaps the up to the window or a slightly larger buffer. if there is not enough datapoints we should be skipping this check too?



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.

Review Comment:
   logging message above should sufficiently explain this and comment should'n be necessary.
   
   As a rule, if you are explaining something about WHAT the code is doing in a comment, it's a sign or an unclear code or an excessive comment. Explaining WHY the code is doing something that it is doing on the other hand is sometimes helpful.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:

Review Comment:
   make this a Dataclass. then, you can use as_dict.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:

Review Comment:
   does this param need to be tied to changepoint lookup window?  curious, why 100? you could also add more context or a link to documentation for how/when to tune these params.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]
+
+  # Check if the current change point is equal to the
+  # latest reported change point on GitHub Issues using the
+  # value and the timestamp. If it's the same,
+  # don't create or comment on issues.
+  if ((previous_change_point == metric_values[change_point_index]) and
+      (previous_change_point_timestamp == change_point_timestamp)):
+    return False, None
+
+  alert = True
+  for sibling_index in sibling_indexes_to_search:
+    if metric_values[sibling_index] == previous_change_point:
+      alert = False
+      return alert, None
+  return alert, issue_number
+
+
+def read_test_config(config_file_path: str) -> Dict:
+  """
+  Reads the config file in which the data required to
+  run the change point analysis is specified.
+  """
+  with open(config_file_path, 'r') as stream:
+    config = yaml.safe_load(stream)
+  return config
+
+
+def run(args) -> None:
+  """
+  run is the entry point to run change point analysis on test metric
+  data, which is read from config file, and if there is a performance
+  regression observed for a test, an alert will filed with GitHub Issues.
+
+  The config file is provide as command line argument. If no config file was
+  provided on cmd line, the default config file will be used.
+
+  For each test in config yaml file, if the source is the big_query,
+  the expected keys for single test that are required to run the change point
+   analysis are test_name, metrics_dataset, metrics_table, project, metric_name.
+
+  """
+  config_file_path = args.config_file_path
+  if config_file_path is None:
+    config_file_path = os.path.join(
+        os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml')
+
+  tests_config: Dict[Dict[str, Any]] = read_test_config(config_file_path)
+
+  # change_point_sibling_distance, change_point_to_recent_run_window can be
+  # defined in the config file for each test whihc are used
+  # to avoid filing GitHub issues for duplicate change points. Please take
+  # a look at the README for more information on the parameters defined in the
+  # config file.
+  for _, params in tests_config.items():
+    metric_name = params['metric_name']
+    # replace . with _ in test_name. This test name would be used later
+    # as a BQ table name and the BQ table doesn't accept . in the name.
+    test_name = params['test_name'].replace('.', '_')
+    if params['source'] == 'big_query':
+      metric_data: pd.DataFrame = FetchMetrics.fetch_from_bq(
+          project_name=params['project'],
+          dataset=params['metrics_dataset'],
+          table=params['metrics_table'],
+          metric_name=metric_name)
+    else:
+      # (TODO): Implement fetching metric_data from InfluxDB.
+      params = None
+    assert params is not None
+
+    labels = params['labels']
+    change_point_sibling_distance = params['change_point_sibling_distance']
+    change_point_to_recent_run_window = params[
+        'change_point_to_recent_run_window']
+
+    metric_values = metric_data[load_test_metrics_utils.VALUE_LABEL]
+    timestamps = metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL]
+
+    # run change point analysis on the metric_values using edivisive means
+    cp_analyzer = ChangePointAnalysis(
+        metric_name=metric_name, data=metric_values)
+
+    change_points_idx = cp_analyzer.edivisive_means()
+    # No change point found. Continue on to the next test.
+    if not change_points_idx:
+      continue
+
+    # always consider the latest change points
+    change_points_idx.sort(reverse=True)
+    change_point_index = change_points_idx[0]
+    change_point_timestamp = timestamps[change_point_index]
+
+    # check if the change point lies in the valid window.
+    # window - Number of runs between the
+    # change_point_to_recent_run_window run and the most recent run.
+    if not is_change_point_in_valid_window(change_point_to_recent_run_window,

Review Comment:
   I think if we consider only a subset of last runs as noted above, we wouldn't need to do the check.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):

Review Comment:
   In such cases you should be able to modify the loop bounds so as to avoid another `if`. then, the loop itself becomes unnecessary.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):

Review Comment:
   same here



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:

Review Comment:
   simpler syntax: `return  change_point_to_recent_run_window >= change_point_index`



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """

Review Comment:
   I think a performance improvement will also trigger changepoint here, so we should say, Performance regression or improvement.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.

Review Comment:
   I think it makes sense to consider last N runs when we are looking for change point, but when we found a changepoint, we should file an alert if an alert for this changepoint (or a sibling changepoint) has not yet been filed. 
   
   If an alert has not been filed, but we still see a changepoint , we should file an alert.



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]

Review Comment:
   what is the changepoint that is being stored? a particular reading for the metric, right? what is the type? float? if so, we should compare with `math.isclose()`



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,
+) -> Optional[Tuple[bool, Optional[int]]]:
+  """
+  Finds the sibling change point index. If not,
+  returns the original change point index.
+
+  Sibling change point is a neighbor of latest
+  change point, within the distance of change_point_sibling_distance.
+  For sibling change point, a GitHub issue is already created.
+  """
+
+  # Search backward from the current change point
+  sibling_indexes_to_search = []
+  for i in range(change_point_index - 1, -1, -1):
+    if change_point_index - i <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Search forward from the current change point
+  for i in range(change_point_index + 1, len(metric_values)):
+    if i - change_point_index <= change_point_sibling_distance:
+      sibling_indexes_to_search.append(i)
+  # Look for change points within change_point_sibling_distance.
+  # Return the first change point found.
+  query_template = """
+  SELECT * FROM {project}.{dataset}.{table}
+  WHERE {metric_name_id} = '{metric_name}'
+  ORDER BY {timestamp} DESC
+  LIMIT 10
+  """.format(
+      project=_BQ_PROJECT_NAME,
+      dataset=_BQ_DATASET,
+      metric_name_id=METRIC_NAME,
+      metric_name=metric_name,
+      timestamp=ISSUE_CREATION_TIMESTAMP_LABEL,
+      table=test_name)
+  try:
+    df = FetchMetrics.fetch_from_bq(query_template=query_template)
+  except exceptions.NotFound:
+    # If no table found, that means this is first performance regression
+    # on the current test:metric.
+    return True, None
+  previous_change_point = df[CHANGE_POINT_LABEL].tolist()[0]
+  previous_change_point_timestamp = df[CHANGEPOINT_TIMESTAMP_LABEL].tolist()[0]
+  issue_number = df[ISSUE_NUMBER].tolist()[0]

Review Comment:
   nit: there is no point to fetch 10 records if we are only looking at 1



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+
+
+def create_or_comment_issue(
+    title: str,
+    description: str,
+    labels: Optional[List] = None,
+    issue_number: Optional[int] = None) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+  If an issue is already created and is open,
+  then comment on the issue instead of creating a duplicate issue.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+    issue_number: GitHub issue number used to find the already created issue.
+  """
+  if issue_number:
+    commented_on_issue, comment_url = comment_on_issue(
+      issue_number=issue_number,
+      comment_description=description)
+    if commented_on_issue:
+      return issue_number, comment_url
+
+  # Issue number was not provided or issue with provided number
+  # is closed. In that case, create a new issue.
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME)
+  data = {
+      'owner': _BEAM_REPO_OWNER,
+      'repo': _BEAM_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_REPO_OWNER,
+          'repo': _BEAM_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS)
+  status_code = open_issue_response.status_code
+  open_issue_response = open_issue_response.json()
+  if status_code == 200 and open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_REPO_OWNER,
+        'repo': _BEAM_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), headers=_HEADERS)
+    return True, response.json()['html_url']
+
+  return False, None
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List,
+    metric_values: List,
+    change_point_index: int,
+    max_results_to_display: int = 5) -> str:
+  """
+  Args:
+   metric_name: Metric name used for the Change Point Analysis.
+   timestamps: Timestamps of the metrics when they were published to the
+    Database.
+   metric_values: Values of the metric for the previous runs.
+   change_point_index: Index for the change point. The element in the
+    index of the metric_values would be the change point.
+   max_results_to_display: Max number of results to display from the change
+    point index, in both directions of the change point index.
+
+  Returns:
+    str: Description used to fill the GitHub issues description.
+  """
+
+  # TODO: Add mean and median before and after the changepoint index.
+  indices_to_display = []
+  upper_bound = min(
+      change_point_index + max_results_to_display, len(metric_values))
+  for i in range(change_point_index, upper_bound):
+    indices_to_display.append(i)
+  lower_bound = max(0, change_point_index - max_results_to_display)
+  for i in range(lower_bound, change_point_index):
+    indices_to_display.append(i)
+  indices_to_display.sort()
+  description = _ISSUE_DESCRIPTION_HEADER.format(metric_name) + 2 * '\n'
+  for i in indices_to_display:

Review Comment:
   why not have a loop from lower_bound to upper_bound?



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+
+
+def create_or_comment_issue(
+    title: str,
+    description: str,
+    labels: Optional[List] = None,
+    issue_number: Optional[int] = None) -> Tuple[int, str]:
+  """
+  Create an issue with title, description with a label.
+  If an issue is already created and is open,
+  then comment on the issue instead of creating a duplicate issue.
+
+  Args:
+    title:  GitHub issue title.
+    description: GitHub issue description.
+    labels: Labels used to tag the GitHub issue.
+    issue_number: GitHub issue number used to find the already created issue.
+  """
+  if issue_number:
+    commented_on_issue, comment_url = comment_on_issue(
+      issue_number=issue_number,
+      comment_description=description)
+    if commented_on_issue:
+      return issue_number, comment_url
+
+  # Issue number was not provided or issue with provided number
+  # is closed. In that case, create a new issue.
+  url = "https://api.github.com/repos/{}/{}/issues".format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME)
+  data = {
+      'owner': _BEAM_REPO_OWNER,
+      'repo': _BEAM_REPO_NAME,
+      'title': title,
+      'body': description,
+  }
+  if labels:
+    data['labels'] = labels
+  response = requests.post(
+      url=url, data=json.dumps(data), headers=_HEADERS).json()
+  return response['number'], response['html_url']
+
+
+def comment_on_issue(issue_number: int,
+                     comment_description: str) -> Tuple[bool, Optional[str]]:
+  """
+  This method looks for an issue with provided issue_number. If an open
+  issue is found, comment on the open issue with provided description else
+  do nothing.
+
+  Args:
+    issue_number: A GitHub issue number.
+    comment_description: If an issue with issue_number is open,
+      then comment on the issue with the using comment_description.
+  """
+  url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
+      _BEAM_REPO_OWNER, _BEAM_REPO_NAME, issue_number)
+  open_issue_response = requests.get(
+      url,
+      json.dumps({
+          'owner': _BEAM_REPO_OWNER,
+          'repo': _BEAM_REPO_NAME,
+          'issue_number': issue_number
+      }),
+      headers=_HEADERS)
+  status_code = open_issue_response.status_code
+  open_issue_response = open_issue_response.json()
+  if status_code == 200 and open_issue_response['state'] == 'open':
+    data = {
+        'owner': _BEAM_REPO_OWNER,
+        'repo': _BEAM_REPO_NAME,
+        'body': comment_description,
+        issue_number: issue_number,
+    }
+    response = requests.post(
+        open_issue_response['comments_url'], json.dumps(data), headers=_HEADERS)
+    return True, response.json()['html_url']
+
+  return False, None
+
+
+def get_issue_description(
+    metric_name: str,
+    timestamps: List,

Review Comment:
   add types of list elements



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'

Review Comment:
   nit: consider 
   _GITHUB_REPO_OWNER
   _GITHUB_REPO_NAME



##########
sdks/python/apache_beam/testing/analyzers/perf_regression_analysis.py:
##########
@@ -0,0 +1,414 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script is used to run Change Point Analysis using a config file.
+# config file holds the parameters required to fetch data, and to run the
+# change point analysis. Change Point Analysis is used to find Performance
+# regressions for Benchmark/load/performance test.
+
+import argparse
+import logging
+import os
+import time
+import uuid
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import yaml
+from google.api_core import exceptions
+
+from apache_beam.testing.analyzers.github_issues_utils import create_or_comment_issue
+from apache_beam.testing.analyzers.github_issues_utils import get_issue_description
+from apache_beam.testing.load_tests import load_test_metrics_utils
+from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
+from apache_beam.testing.load_tests.load_test_metrics_utils import FetchMetrics
+from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive
+
+_BQ_PROJECT_NAME = 'apache-beam-testing'
+_BQ_DATASET = 'beam_perf_storage'
+
+UNIQUE_ID = 'test_id'
+ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp'
+CHANGEPOINT_TIMESTAMP_LABEL = 'change_point_timestamp'
+CHANGE_POINT_LABEL = 'change_point'
+TEST_NAME = 'test_name'
+METRIC_NAME = 'metric_name'
+ISSUE_NUMBER = 'issue_number'
+ISSUE_URL = 'issue_url'
+# number of results to display on the issue description
+# from change point index in both directions.
+NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10
+
+SCHEMA = [{
+    'name': UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED'
+},
+          {
+              'name': ISSUE_CREATION_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGEPOINT_TIMESTAMP_LABEL,
+              'field_type': 'TIMESTAMP',
+              'mode': 'REQUIRED'
+          },
+          {
+              'name': CHANGE_POINT_LABEL,
+              'field_type': 'FLOAT64',
+              'mode': 'REQUIRED'
+          }, {
+              'name': METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED'
+          }, {
+              'name': ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED'
+          }]
+
+TITLE_TEMPLATE = """
+  Performance Regression: {}:{}
+"""
+# TODO: Add mean value before and mean value after.
+_METRIC_DESCRIPTION = """
+  Affected metric: `{}`
+"""
+_METRIC_INFO = "timestamp: {}, metric_value: `{}`"
+ISSUE_LABELS = ['perf-alerts']
+
+
+class GitHubIssueMetaData:
+  """
+  This class holds metadata that needs to be published to the
+  BigQuery when a GitHub issue is created on a performance
+  alert.
+  """
+  def __init__(
+      self,
+      issue_creation_timestamp,
+      change_point_timestamp,
+      test_name,
+      metric_name,
+      issue_number,
+      issue_url,
+      test_id,
+      change_point):
+    self.issue_creation_timestamp = issue_creation_timestamp
+    self.change_point_timestamp = change_point_timestamp
+    self.test_name = test_name
+    self.metric_name = metric_name
+    self.issue_number = issue_number
+    self.issue_url = issue_url
+    self.test_id = test_id
+    self.change_point = change_point
+
+  def as_dict(self) -> Dict:
+    return {
+        ISSUE_CREATION_TIMESTAMP_LABEL: self.issue_creation_timestamp,
+        CHANGEPOINT_TIMESTAMP_LABEL: self.change_point_timestamp,
+        TEST_NAME: self.test_name,
+        METRIC_NAME: self.metric_name,
+        ISSUE_NUMBER: self.issue_number,
+        UNIQUE_ID: self.test_id,
+        CHANGE_POINT_LABEL: self.change_point,
+        ISSUE_URL: self.issue_url
+    }
+
+
+class ChangePointAnalysis:
+  def __init__(
+      self,
+      data: Union[List[float], List[List[float]], np.ndarray],
+      metric_name: str,
+  ):
+    self.data = data
+    self.metric_name = metric_name
+
+  def edivisive_means(self,
+                      pvalue: float = 0.05,
+                      permutations: int = 100) -> List[int]:
+    """
+    Args:
+     pvalue: p value for the permutation test.
+     permutations: Number of permutations for the permutation test.
+
+    Performs edivisive means on the data and returns the indices of the
+    Change points.
+
+    Returns:
+     The indices of change points.
+    """
+    return e_divisive(self.data, pvalue, permutations)
+
+
+def is_change_point_in_valid_window(
+    change_point_to_recent_run_window: int, change_point_index: int) -> bool:
+  # If the change point is more than N runs behind the most recent run,
+  # Ignore the change point and don't raise an alert for it.
+  if change_point_to_recent_run_window >= change_point_index:
+    return True
+  return False
+
+
+def has_sibling_change_point(
+    change_point_index: int,
+    change_point_sibling_distance: int,
+    metric_values: List,
+    metric_name: str,
+    test_name: str,
+    change_point_timestamp: float,

Review Comment:
   is timestamp really a float here?



##########
sdks/python/apache_beam/testing/analyzers/github_issues_utils.py:
##########
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import logging
+import os
+from typing import List
+from typing import Optional
+from typing import Tuple
+
+import requests
+
+try:
+  _GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
+except KeyError as e:
+  _GITHUB_TOKEN = None
+  logging.warning(
+      'A Github Personal Access token is required '
+      'to create Github Issues.')
+
+_BEAM_REPO_OWNER = 'AnandInguva'
+_BEAM_REPO_NAME = 'beam'
+_HEADERS = {
+    "Authorization": 'token {}'.format(_GITHUB_TOKEN),
+    "Accept": "application/vnd.github+json"
+}
+
+# Fill the GitHub issue description with the below variables.
+_ISSUE_DESCRIPTION_HEADER = """

Review Comment:
   If I received an alert I would to see the dynamics, including most recent readings of the metric. If the alert
   was fired some time ago, the information in the issue may not be current. How would I go about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org