You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/22 14:33:49 UTC
[GitHub] [airflow] nazarsh opened a new issue #16590: google.api_core.exceptions.Unknown: None Stream removed (Snowflake and GCP Secret Manager)
nazarsh opened a new issue #16590:
URL: https://github.com/apache/airflow/issues/16590
**Apache Airflow version**: 2.1.0
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`): N/A
**Environment**:
- **Cloud provider or hardware configuration**: Astronomer-based local setup using Docker `quay.io/astronomer/ap-airflow:2.1.0-2-buster-onbuild`
- **OS** (e.g. from /etc/os-release): `Debian GNU/Linux 10 (buster)`
- **Kernel** (e.g. `uname -a`): `Linux 7a92d1fd4406 5.10.25-linuxkit #1 SMP Tue Mar 23 09:27:39 UTC 2021 x86_64 GNU/Linux`
- **Install tools**: apache-airflow-providers-snowflake
- **Others**:
**What happened**:
Having configured Snowflake connection and pointing to GCP Secret Manager backend `AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend` I am getting a pretty consistent error traced all the way down to gRPC
```File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
return callable_(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Stream removed"
debug_error_string = "{"created":"@1624370913.481874500","description":"Error received from peer ipv4:172.xxx.xx.xxx:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Stream removed","grpc_status":2}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/qe/weekly.py", line 63, in snfk_hook
df = hook.get_pandas_df(sql)
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 116, in get_pandas_df
with closing(self.get_conn()) as conn:
File "/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py", line 220, in get_conn
conn_config = self._get_conn_params()
File "/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py", line 152, in _get_conn_params
self.snowflake_conn_id # type: ignore[attr-defined] # pylint: disable=no-member
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/base.py", line 67, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
File "/usr/local/lib/python3.7/site-packages/airflow/models/connection.py", line 376, in get_connection_from_secrets
conn = secrets_backend.get_connection(conn_id=conn_id)
File "/usr/local/lib/python3.7/site-packages/airflow/secrets/base_secrets.py", line 64, in get_connection
conn_uri = self.get_conn_uri(conn_id=conn_id)
File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py", line 134, in get_conn_uri
return self._get_secret(self.connections_prefix, conn_id)
File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py", line 170, in _get_secret
return self.client.get_secret(secret_id=secret_id, project_id=self.project_id)
File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/_internal_client/secret_manager_client.py", line 86, in get_secret
response = self.client.access_secret_version(name)
File "/usr/local/lib/python3.7/site-packages/google/cloud/secretmanager_v1/gapic/secret_manager_service_client.py", line 968, in access_secret_version
request, retry=retry, timeout=timeout, metadata=metadata
File "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
on_error=on_error,
File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
return target()
File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.Unknown: None Stream removed
```
**What you expected to happen**:
DAG successfully retrieves a configured connection for Snowflake from GCP Secret Manager and executes a query returning back a result.
**How to reproduce it**:
1. Configure Google Cloud Platform as secrets backend
`AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend`
2. Configure a Snowflake connection (`requirements.txt` has `apache-airflow-providers-snowflake`)
3. Create a DAG which uses SnowflakeHook similar to this:
```python
import logging
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
args = {"owner": "Airflow", "start_date": airflow.utils.dates.days_ago(2)}
dag = DAG(
dag_id="snowflake_automation", default_args=args, schedule_interval=None
)
snowflake_query = [
"""create table public.test_employee (id number, name string);""",
"""insert into public.test_employee values(1, “Sam”),(2, “Andy”),(3, “Gill”);""",
]
def get_row_count(**context):
dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
result = dwh_hook.get_first("select count(*) from public.test_employee")
logging.info("Number of rows in `public.test_employee` - %s", result[0])
with dag:
create_insert = SnowflakeOperator(
task_id="snowfalke_create",
sql=snowflake_query ,
snowflake_conn_id="snowflake_conn",
)
get_count = PythonOperator(task_id="get_count", python_callable=get_row_count)
create_insert >> get_count
```
**Anything else we need to know**:
I looked around to see if this is an issue with Google's `api-core` and it seems like somebody has done research into it to point out that it might be downstream implementation issue and not the `api-core` issue: https://stackoverflow.com/questions/67374613/why-does-accessing-this-variable-fail-after-it-is-used-in-a-thread
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #16590: google.api_core.exceptions.Unknown: None Stream removed (Snowflake and GCP Secret Manager)
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #16590:
URL: https://github.com/apache/airflow/issues/16590#issuecomment-866037394
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk closed issue #16590: google.api_core.exceptions.Unknown: None Stream removed (Snowflake and GCP Secret Manager)
Posted by GitBox <gi...@apache.org>.
potiuk closed issue #16590:
URL: https://github.com/apache/airflow/issues/16590
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #16590: google.api_core.exceptions.Unknown: None Stream removed (Snowflake and GCP Secret Manager)
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #16590:
URL: https://github.com/apache/airflow/issues/16590#issuecomment-886253241
I am afraid there isn't much we can do in Airflow to fix it. Closing it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org