You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/31 14:43:06 UTC

[GitHub] [airflow] VBhojawala opened a new pull request #13405: KubernetesPodOperator Guide

VBhojawala opened a new pull request #13405:
URL: https://github.com/apache/airflow/pull/13405


   Added KubernetesPodOperator guide in KubernetesPodOperator.rst
   
   related: #8970


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#issuecomment-851727531


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r570704372



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -115,14 +905,169 @@ alongside the Pod. The Pod must write the XCom value into this location at the `
 
 See the following example on how this occurs:
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_write_xcom]
-    :end-before: [END howto_operator_k8s_write_xcom]
 
-Reference
-^^^^^^^^^
-For further information, look at:
+- Example Dag : KubernetesPodOperator task writes contents to be returned to ``/airflow/xcom/return.json`` and reading
+  values returned using ``xcom_pull(key, task_ids)``.
+
+.. code-block:: python

Review comment:
       I have extracted example and tagged in doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala edited a comment on pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala edited a comment on pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#issuecomment-763344070


   Hi @dimberman ,
   Thank you for reviewing PR.
   I have added text from previous document and made changes suggested in review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] john-jac commented on pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
john-jac commented on pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#issuecomment-756475814


   Hi @VBhojawala !  Just reviewing your doc and it looks good.  FYI there's one focused on Amazon MWAA and EKS at https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html that might be worth a reference as it, too, simplifies the authorization process.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r594032862



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,28 +15,187 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
 
+.. contents:: :local:
 
-.. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` allows
-you to create and run Pods on a Kubernetes cluster.
+KubernetesPodOperator with YAML file / JSON Spec
+================================================
+
+- Creating a Pod Template file

Review comment:
       ```suggestion
   Below is an example of a basic pod_template_file for a KubernetesPodOperator pod. Please note that the first container is named base, as this is required for Airflow to later run the pod
   ```
   

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,28 +15,187 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
 
+.. contents:: :local:
 
-.. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` allows
-you to create and run Pods on a Kubernetes cluster.
+KubernetesPodOperator with YAML file / JSON Spec
+================================================
+
+- Creating a Pod Template file
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: Pod
+  metadata:
+    labels:
+      app: myapp
+    name: myapp-pod
+    namespace: default
+  spec:
+    containers:
+      - name: base
+        image: alpine
+        env:
+          - name: TIME_OUT
+            value: '15'
+          - name: ENV_TYPE
+            value: 'test'
+        envFrom:
+          - secretRef:
+              name: db-credentials
+        volumeMounts:
+          - name: myapp-volume
+            mountPath: /root/myapp
+        resources:
+          limits:
+            cpu: 2
+            memory: "200Mi"
+          requests:
+            cpu: 1
+            memory: "100Mi"
+        command: ['sh', '-c', 'echo "myapp-pod created from YAML pod template."']
+    volumes:
+      - name: myapp-volume
+        persistentVolumeClaim:
+          claimName: myapp-pvc-rw
+
+
+- Creating pod from a template using KubernetesPodOperator
+
+.. code-block:: python
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_yaml_config", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+
+      pod_template_path = '/abs/path/to/file/k8s_pod_template.yaml'
+      task1 = KubernetesPodOperator(task_id='k8s_pod_yaml_config_task',
+                                    pod_template_file=pod_template_path,
+                                    namespace='default',
+                                    name='airflow_pod_yaml_config',
+                                    startup_timeout_seconds=60,)
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_yaml_config
+  AIRFLOW_CTX_TASK_ID=k8s_pod_yaml_config_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-05T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-05T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: myapp-pod had an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: myapp-pod
+  {pod_launcher.py:176} INFO - Event: myapp-pod had an event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id myapp-pod Succeeded
+
+  {pod_launcher.py:136} INFO - myapp-pod created from YAML pod template.
 
-.. contents::
-  :depth: 1
-  :local:
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_yaml_config, task_id=
+  k8s_pod_yaml_config_task, execution_date=20201205T000000, start_date=20201206T130803, end_date=20201206T130818
+  {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
+  {dagrun.py:447} INFO - Marking run <DagRun example_k8s_yaml_config @ 2020-12-05 00:00:00+00:00:
+  backfill__2020-12-05T00:00:00+00:00, externally triggered: False> successful
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
 
-.. note::
-  If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, consider
-  using the
-  :ref:`GKEStartPodOperator <howto/operator:GKEStartPodOperator>` operator as it
-  simplifies the Kubernetes authorization process.
 
-.. note::
-  The :doc:`Kubernetes executor <apache-airflow:executor/kubernetes>` is **not** required to use this operator.
+- Examining pod configuration

Review comment:
       ```suggestion
   If you need to examine a pod that is not running correctly, run the command ``kubectl describe pod <my pod name> to get all relevant details about the pod from the k8s cluster.
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+

Review comment:
       Can you put this is an example python file?

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume

Review comment:
       ```suggestion
   To add a PersistentVolume to your KubernetesPodOperator deployment, start by adding a PersistentVolume and a PersistentVolumeClaim as seen below
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: myapp-pvc-rw
+  spec:
+    resources:
+      requests:
+        storage: 20Mi
+    accessModes:
+    - ReadWriteMany
+    storageClassName: ""
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pvc.yaml
+
+
+- Writing and Reading file from Persistent Volume using KubernetesPodOperator
+
+.. code-block:: python
+
+  from kubernetes.client import V1VolumeMount, V1Volume, V1PersistentVolumeClaimVolumeSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_volume", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      myapp_volume = V1Volume(
+          name='myapp-volume',
+          persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='myapp-pvc-rw'))
+
+      myapp_volume_mount = V1VolumeMount(mount_path='/root/myapp', name='myapp-volume')
+
+      task1 = KubernetesPodOperator(task_id='k8s_volume_read_task',
+                                    name='airflow_pod_volume_read',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'date > /root/myapp/date.txt',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task2 = KubernetesPodOperator(task_id='k8s_volume_write_task',
+                                    name='airflow_pod_volume_write',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'echo "Reading date from date.txt : "$(cat /root/myapp/date.txt)',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task1 >> task2
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_read_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_write_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Running
+
+  {pod_launcher.py:136} INFO - Reading date from date.txt : Sat Dec 5 13:58:35 UTC 2020
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+

Review comment:
       ```suggestion
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -53,25 +212,120 @@ dependencies that are not available through the public PyPI repository. It also
 YAML file using the ``pod_template_file`` parameter.
 Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:
 
 .. code-block:: python
 
-  from kubernetes.client import models as k8s
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
 
-With this API object, you can have access to all Kubernetes API objects in the form of python classes.
-Using this method will ensure correctness
-and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
-:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................

Review comment:
       Why did you add more log lines here?

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+

Review comment:
       ```suggestion
   ```
   
   We don't need to show logs for all of these.

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -53,25 +212,120 @@ dependencies that are not available through the public PyPI repository. It also
 YAML file using the ``pod_template_file`` parameter.
 Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:
 
 .. code-block:: python
 
-  from kubernetes.client import models as k8s
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
 
-With this API object, you can have access to all Kubernetes API objects in the form of python classes.
-Using this method will ensure correctness
-and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
-:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................
+
+- Getting kubernetes pods using labels ``dag_id`` and ``task_id`` automatically assigned by Airflow and Describing it.
+
+.. code-block:: bash
+
+  $ kubectl get pods -l dag_id=example_k8s_operator,task_id=k8s_pod_operator_task
+
+    NAME                                                    READY   STATUS      RESTARTS   AGE
+    airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c   0/1     Completed   0          14m
+
+  $ kubectl describe pod airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+
+    Name:         airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+    Namespace:    default
+    Priority:     0
+    Node:         minikube/192.168.49.2
+    Start Time:   Fri, 04 Dec 2020 19:33:36 +0530
+    Labels:       airflow_version=2.0.0b2
+                  dag_id=example_k8s_operator
+                  execution_date=2020-12-03T0000000000-767fcb862
+                  kubernetes_pod_operator=True
+                  task_id=k8s_pod_operator_task
+                  try_number=1
+    Annotations:  <none>
+    Status:       Succeeded
+    IP:           172.17.0.7
+    IPs:
+      IP:  172.17.0.7
+    Containers:
+      base:
+        Container ID:  docker://56c91324dc925b0bad0d60474e35d8c7eb7fad7d8410ca123b657f1416207504
+        Image:         alpine
+        Image ID:      docker-pullable://alpine@sha256:c0e9560cda118f9ec63ddefb4a173a2b2a0347082d7dff7dc14272e7841a5b5a
+        Port:          <none>
+        Host Port:     <none>
+        Command:
+          sh
+          -c
+          echo "Hello World from pod [$HOSTNAME]"
+        State:          Terminated
+          Reason:       Completed
+          Exit Code:    0
+          Started:      Fri, 04 Dec 2020 19:33:43 +0530
+          Finished:     Fri, 04 Dec 2020 19:33:43 +0530
+        Ready:          False
+        Restart Count:  0
+        Environment:    <none>
+        Mounts:
+          /var/run/secrets/kubernetes.io/serviceaccount from default-token-ltgdm (ro)
+    Conditions:
+      Type              Status
+      Initialized       True
+      Ready             False
+      ContainersReady   False
+      PodScheduled      True
+    Volumes:
+      default-token-ltgdm:
+        Type:        Secret (a volume populated by a Secret)
+        SecretName:  default-token-ltgdm
+        Optional:    false
+    QoS Class:       BestEffort
+    Node-Selectors:  <none>
+    Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
+                     node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
+    Events:
+      Type    Reason     Age   From               Message
+      ----    ------     ----  ----               -------
+      Normal  Scheduled  15m   default-scheduler  Successfully assigned default/airflow-pod-operator
+                                                  -aed97ecd64854367ad7d0ff39f37859c to minikube
+      Normal  Pulling    15m   kubelet            Pulling image "alpine"
+      Normal  Pulled     15m   kubelet            Successfully pulled image "alpine" in 4.214686688s
+      Normal  Created    15m   kubelet            Created container base
+      Normal  Started    15m   kubelet            Started container base

Review comment:
       ```suggestion
   ```
   
   We don't need to show this twice

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,28 +15,187 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
 
+.. contents:: :local:
 
-.. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` allows
-you to create and run Pods on a Kubernetes cluster.
+KubernetesPodOperator with YAML file / JSON Spec
+================================================
+
+- Creating a Pod Template file
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: Pod
+  metadata:
+    labels:
+      app: myapp
+    name: myapp-pod
+    namespace: default
+  spec:
+    containers:
+      - name: base
+        image: alpine
+        env:
+          - name: TIME_OUT
+            value: '15'
+          - name: ENV_TYPE
+            value: 'test'
+        envFrom:
+          - secretRef:
+              name: db-credentials
+        volumeMounts:
+          - name: myapp-volume
+            mountPath: /root/myapp
+        resources:
+          limits:
+            cpu: 2
+            memory: "200Mi"
+          requests:
+            cpu: 1
+            memory: "100Mi"
+        command: ['sh', '-c', 'echo "myapp-pod created from YAML pod template."']
+    volumes:
+      - name: myapp-volume
+        persistentVolumeClaim:
+          claimName: myapp-pvc-rw
+
+
+- Creating pod from a template using KubernetesPodOperator

Review comment:
       ```suggestion
   Once you have created a pod_template_file, you can use this file as a basis for your KPO pod, while using the other supplied arguments as "overrides". In this example,, we are able to take this template and modify the namespace and name of the pod.
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -53,25 +212,120 @@ dependencies that are not available through the public PyPI repository. It also
 YAML file using the ``pod_template_file`` parameter.
 Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:
 
 .. code-block:: python
 
-  from kubernetes.client import models as k8s
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
 
-With this API object, you can have access to all Kubernetes API objects in the form of python classes.
-Using this method will ensure correctness
-and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
-:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................

Review comment:
       ```suggestion
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -53,25 +212,120 @@ dependencies that are not available through the public PyPI repository. It also
 YAML file using the ``pod_template_file`` parameter.
 Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:
 
 .. code-block:: python
 
-  from kubernetes.client import models as k8s
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
 
-With this API object, you can have access to all Kubernetes API objects in the form of python classes.
-Using this method will ensure correctness
-and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
-:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................
+
+- Getting kubernetes pods using labels ``dag_id`` and ``task_id`` automatically assigned by Airflow and Describing it.
+
+.. code-block:: bash
+
+  $ kubectl get pods -l dag_id=example_k8s_operator,task_id=k8s_pod_operator_task
+
+    NAME                                                    READY   STATUS      RESTARTS   AGE
+    airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c   0/1     Completed   0          14m
+
+  $ kubectl describe pod airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+
+    Name:         airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+    Namespace:    default
+    Priority:     0
+    Node:         minikube/192.168.49.2
+    Start Time:   Fri, 04 Dec 2020 19:33:36 +0530
+    Labels:       airflow_version=2.0.0b2
+                  dag_id=example_k8s_operator
+                  execution_date=2020-12-03T0000000000-767fcb862
+                  kubernetes_pod_operator=True
+                  task_id=k8s_pod_operator_task
+                  try_number=1
+    Annotations:  <none>
+    Status:       Succeeded
+    IP:           172.17.0.7
+    IPs:
+      IP:  172.17.0.7
+    Containers:
+      base:
+        Container ID:  docker://56c91324dc925b0bad0d60474e35d8c7eb7fad7d8410ca123b657f1416207504
+        Image:         alpine
+        Image ID:      docker-pullable://alpine@sha256:c0e9560cda118f9ec63ddefb4a173a2b2a0347082d7dff7dc14272e7841a5b5a
+        Port:          <none>
+        Host Port:     <none>
+        Command:
+          sh
+          -c
+          echo "Hello World from pod [$HOSTNAME]"
+        State:          Terminated
+          Reason:       Completed
+          Exit Code:    0
+          Started:      Fri, 04 Dec 2020 19:33:43 +0530
+          Finished:     Fri, 04 Dec 2020 19:33:43 +0530
+        Ready:          False
+        Restart Count:  0
+        Environment:    <none>
+        Mounts:
+          /var/run/secrets/kubernetes.io/serviceaccount from default-token-ltgdm (ro)
+    Conditions:
+      Type              Status
+      Initialized       True
+      Ready             False
+      ContainersReady   False
+      PodScheduled      True
+    Volumes:
+      default-token-ltgdm:
+        Type:        Secret (a volume populated by a Secret)
+        SecretName:  default-token-ltgdm
+        Optional:    false
+    QoS Class:       BestEffort
+    Node-Selectors:  <none>
+    Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
+                     node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
+    Events:
+      Type    Reason     Age   From               Message
+      ----    ------     ----  ----               -------
+      Normal  Scheduled  15m   default-scheduler  Successfully assigned default/airflow-pod-operator
+                                                  -aed97ecd64854367ad7d0ff39f37859c to minikube
+      Normal  Pulling    15m   kubelet            Pulling image "alpine"
+      Normal  Pulled     15m   kubelet            Successfully pulled image "alpine" in 4.214686688s
+      Normal  Created    15m   kubelet            Created container base
+      Normal  Started    15m   kubelet            Started container base
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_cluster_resources]
-    :end-before: [END howto_operator_k8s_cluster_resources]

Review comment:
       Why did you remove this?

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+

Review comment:
       ```suggestion
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes

Review comment:
       ```suggestion
   If you would like to use a configMap in your kubernetes deployment, first add a configMap to your namespace before launching the pod. Below is an example of a basic configMap.
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -53,25 +212,120 @@ dependencies that are not available through the public PyPI repository. It also
 YAML file using the ``pod_template_file`` parameter.
 Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:
 
 .. code-block:: python
 
-  from kubernetes.client import models as k8s
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
 
-With this API object, you can have access to all Kubernetes API objects in the form of python classes.
-Using this method will ensure correctness
-and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
-:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................
+
+- Getting kubernetes pods using labels ``dag_id`` and ``task_id`` automatically assigned by Airflow and Describing it.

Review comment:
       ```suggestion
   Another option for getting the pod launched by Airflow is to take advantage of the labels that Airflow injects into every KubernetesPodOperator pod. Using the line below, you can quickly get the name of the pod launched by Airflow.
   
   ``kubectl get pods -l dag_id=<your dag id>,task_id=<your task id>``
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -53,25 +212,120 @@ dependencies that are not available through the public PyPI repository. It also
 YAML file using the ``pod_template_file`` parameter.
 Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:

Review comment:
       Why did you remove this section?

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )

Review comment:
       Please move this into its own python file and then reference it, unless @kaxil or @potiuk thinks it's ok to keep small snippets in documentation.

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,28 +15,187 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
 
+.. contents:: :local:
 
-.. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` allows
-you to create and run Pods on a Kubernetes cluster.
+KubernetesPodOperator with YAML file / JSON Spec
+================================================
+
+- Creating a Pod Template file
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: Pod
+  metadata:
+    labels:
+      app: myapp
+    name: myapp-pod
+    namespace: default
+  spec:
+    containers:
+      - name: base
+        image: alpine
+        env:
+          - name: TIME_OUT
+            value: '15'
+          - name: ENV_TYPE
+            value: 'test'
+        envFrom:
+          - secretRef:
+              name: db-credentials
+        volumeMounts:
+          - name: myapp-volume
+            mountPath: /root/myapp
+        resources:
+          limits:
+            cpu: 2
+            memory: "200Mi"
+          requests:
+            cpu: 1
+            memory: "100Mi"
+        command: ['sh', '-c', 'echo "myapp-pod created from YAML pod template."']
+    volumes:
+      - name: myapp-volume
+        persistentVolumeClaim:
+          claimName: myapp-pvc-rw
+
+
+- Creating pod from a template using KubernetesPodOperator
+
+.. code-block:: python
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_yaml_config", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+
+      pod_template_path = '/abs/path/to/file/k8s_pod_template.yaml'
+      task1 = KubernetesPodOperator(task_id='k8s_pod_yaml_config_task',
+                                    pod_template_file=pod_template_path,
+                                    namespace='default',
+                                    name='airflow_pod_yaml_config',
+                                    startup_timeout_seconds=60,)
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_yaml_config
+  AIRFLOW_CTX_TASK_ID=k8s_pod_yaml_config_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-05T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-05T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: myapp-pod had an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: myapp-pod
+  {pod_launcher.py:176} INFO - Event: myapp-pod had an event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id myapp-pod Succeeded
+
+  {pod_launcher.py:136} INFO - myapp-pod created from YAML pod template.
 
-.. contents::
-  :depth: 1
-  :local:
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_yaml_config, task_id=
+  k8s_pod_yaml_config_task, execution_date=20201205T000000, start_date=20201206T130803, end_date=20201206T130818
+  {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
+  {dagrun.py:447} INFO - Marking run <DagRun example_k8s_yaml_config @ 2020-12-05 00:00:00+00:00:
+  backfill__2020-12-05T00:00:00+00:00, externally triggered: False> successful
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0

Review comment:
       I'm not sure what the point of this section is? Are you showing what the logs look like? If so can you give instructions on how to find the logs (e.g. via the UI or via a k8s command)

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod

Review comment:
       ```suggestion
   Once you have created this configmap, you can access it using k8s python client objects. Below is an example of adding a configMap to a KubernetesPodOperator deployment.
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable

Review comment:
       ```suggestion
   Now that this secret has been added to our cluster, we can access it using the following code
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets

Review comment:
       ```suggestion
   To test adding secrets to a KubernetesPodOperator pod, we first create a basic secret using the steps below.
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim

Review comment:
       ```suggestion
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+

Review comment:
       ```suggestion
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: myapp-pvc-rw
+  spec:
+    resources:
+      requests:
+        storage: 20Mi
+    accessModes:
+    - ReadWriteMany
+    storageClassName: ""
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pvc.yaml
+
+
+- Writing and Reading file from Persistent Volume using KubernetesPodOperator
+
+.. code-block:: python
+
+  from kubernetes.client import V1VolumeMount, V1Volume, V1PersistentVolumeClaimVolumeSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_volume", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      myapp_volume = V1Volume(
+          name='myapp-volume',
+          persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='myapp-pvc-rw'))
+
+      myapp_volume_mount = V1VolumeMount(mount_path='/root/myapp', name='myapp-volume')
+
+      task1 = KubernetesPodOperator(task_id='k8s_volume_read_task',
+                                    name='airflow_pod_volume_read',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'date > /root/myapp/date.txt',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task2 = KubernetesPodOperator(task_id='k8s_volume_write_task',
+                                    name='airflow_pod_volume_write',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'echo "Reading date from date.txt : "$(cat /root/myapp/date.txt)',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task1 >> task2
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_read_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_write_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Running
+
+  {pod_launcher.py:136} INFO - Reading date from date.txt : Sat Dec 5 13:58:35 UTC 2020
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+Mounting Secrets as Volume
+==========================
+
+- Example Dag demonstrating use of ``Secret`` class which internally configures ``Volume`` and ``VolumeMount`` for
+  given secret.

Review comment:
       ```suggestion
   This example shows how we can use an Airflow ``Secret`` object to inject a secret into a pod as a VolumeMount
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: myapp-pvc-rw
+  spec:
+    resources:
+      requests:
+        storage: 20Mi
+    accessModes:
+    - ReadWriteMany
+    storageClassName: ""
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pvc.yaml
+
+
+- Writing and Reading file from Persistent Volume using KubernetesPodOperator

Review comment:
       ```suggestion
   Now that we have created the PersistentVolume and PersistentVolumeClaim, we can both read and write to the volume using the code below
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: myapp-pvc-rw
+  spec:
+    resources:
+      requests:
+        storage: 20Mi
+    accessModes:
+    - ReadWriteMany
+    storageClassName: ""
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pvc.yaml
+
+
+- Writing and Reading file from Persistent Volume using KubernetesPodOperator
+
+.. code-block:: python
+
+  from kubernetes.client import V1VolumeMount, V1Volume, V1PersistentVolumeClaimVolumeSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_volume", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      myapp_volume = V1Volume(
+          name='myapp-volume',
+          persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='myapp-pvc-rw'))
+
+      myapp_volume_mount = V1VolumeMount(mount_path='/root/myapp', name='myapp-volume')
+
+      task1 = KubernetesPodOperator(task_id='k8s_volume_read_task',
+                                    name='airflow_pod_volume_read',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'date > /root/myapp/date.txt',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task2 = KubernetesPodOperator(task_id='k8s_volume_write_task',
+                                    name='airflow_pod_volume_write',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'echo "Reading date from date.txt : "$(cat /root/myapp/date.txt)',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task1 >> task2
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_read_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_write_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Running
+
+  {pod_launcher.py:136} INFO - Reading date from date.txt : Sat Dec 5 13:58:35 UTC 2020
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+Mounting Secrets as Volume
+==========================
+
+- Example Dag demonstrating use of ``Secret`` class which internally configures ``Volume`` and ``VolumeMount`` for
+  given secret.
+
+.. code-block:: python
+
+  from kubernetes.client import V1Volume, V1SecretVolumeSource, V1VolumeMount
+
+  from airflow import DAG
+  from airflow.kubernetes.secret import Secret
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret_volume", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      secret = Secret('volume', '/etc/my-secret', 'db-credentials')
+      # Is Equal to below two lines
+      # secret_volume = V1Volume(name='my-secret-vol', secret=V1SecretVolumeSource(secret_name='db-credentials'))
+      # secret_volume_mount = V1VolumeMount(mount_path='/etc/my-secret', name='my-secret-vol', read_only=True)
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_volume_task',
+                                    name='airflow_pod_operator_secret_volume',
+                                    namespace='default',
+                                    secrets=[secret, ],
+                                    # secrets is equal to below two lines
+                                    # volumes=[secret_volume, ],
+                                    # volume_mounts=[secret_volume_mount, ],
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Secret Directory Content "$(ls -l /etc/my-secret)'],
+                                    in_cluster=False,
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret_volume
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_volume_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-06T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-06T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-secret-volume-
+  c03db098442b45f2aeb58e2dbca8e78f
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  had an event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  Succeeded
+
+  {pod_launcher.py:136} INFO - Secret Directory Content
+  total 0
+  lrwxrwxrwx 1 root root 13 Dec 7 15:17 DB_PWD -> ..data/DB_PWD
+  lrwxrwxrwx 1 root root 14 Dec 7 15:17 DB_USER -> ..data/DB_USER
+
+  {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
+  {dagrun.py:444} INFO - Marking run <DagRun example_k8s_secret_volume
+  @ 2020-12-06 00:00:00+00:00: backfill__2020-12-06T00:00:00+00:00, externally triggered: False> successful
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+
+Defining Pod resource Requirements and Limits
+=============================================
+
+
+- Defining ResourceRequirements for Pod
+
+
+.. code-block:: python
+
+  from kubernetes.client import V1ResourceRequirements
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_limit_resource", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+
+      resource_req = V1ResourceRequirements(
+                              requests={
+                                  "cpu": 1,
+                                  'memory': '100Mi'
+                              },
+                              limits={
+                                  "cpu": 2,
+                                  'memory': '200Mi',
+                              }
+      )
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_limit_resource_task',
+                                    name='airflow_pod_limit_resource',
+                                    namespace='default',
+                                    image='alpine',
+                                    resources=resource_req,
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_limit_resource
+  AIRFLOW_CTX_TASK_ID=k8s_pod_limit_resource_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-05T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-05T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6
+  {pod_launcher.py:176} INFO - Event: airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6 Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6]
+
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_limit_resource,task_id=
+  k8s_pod_limit_resource_task, execution_date=20201205T000000, start_date=20201206T035456, end_date=20201206T035517
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+

Review comment:
       ```suggestion
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: myapp-pvc-rw
+  spec:
+    resources:
+      requests:
+        storage: 20Mi
+    accessModes:
+    - ReadWriteMany
+    storageClassName: ""
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pvc.yaml
+
+
+- Writing and Reading file from Persistent Volume using KubernetesPodOperator
+
+.. code-block:: python
+
+  from kubernetes.client import V1VolumeMount, V1Volume, V1PersistentVolumeClaimVolumeSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_volume", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      myapp_volume = V1Volume(
+          name='myapp-volume',
+          persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='myapp-pvc-rw'))
+
+      myapp_volume_mount = V1VolumeMount(mount_path='/root/myapp', name='myapp-volume')
+
+      task1 = KubernetesPodOperator(task_id='k8s_volume_read_task',
+                                    name='airflow_pod_volume_read',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'date > /root/myapp/date.txt',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task2 = KubernetesPodOperator(task_id='k8s_volume_write_task',
+                                    name='airflow_pod_volume_write',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'echo "Reading date from date.txt : "$(cat /root/myapp/date.txt)',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task1 >> task2
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_read_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_write_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Running
+
+  {pod_launcher.py:136} INFO - Reading date from date.txt : Sat Dec 5 13:58:35 UTC 2020
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+Mounting Secrets as Volume
+==========================
+
+- Example Dag demonstrating use of ``Secret`` class which internally configures ``Volume`` and ``VolumeMount`` for
+  given secret.
+
+.. code-block:: python
+
+  from kubernetes.client import V1Volume, V1SecretVolumeSource, V1VolumeMount
+
+  from airflow import DAG
+  from airflow.kubernetes.secret import Secret
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret_volume", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      secret = Secret('volume', '/etc/my-secret', 'db-credentials')
+      # Is Equal to below two lines
+      # secret_volume = V1Volume(name='my-secret-vol', secret=V1SecretVolumeSource(secret_name='db-credentials'))
+      # secret_volume_mount = V1VolumeMount(mount_path='/etc/my-secret', name='my-secret-vol', read_only=True)
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_volume_task',
+                                    name='airflow_pod_operator_secret_volume',
+                                    namespace='default',
+                                    secrets=[secret, ],
+                                    # secrets is equal to below two lines
+                                    # volumes=[secret_volume, ],
+                                    # volume_mounts=[secret_volume_mount, ],
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Secret Directory Content "$(ls -l /etc/my-secret)'],
+                                    in_cluster=False,
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret_volume
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_volume_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-06T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-06T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-secret-volume-
+  c03db098442b45f2aeb58e2dbca8e78f
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  had an event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  Succeeded
+
+  {pod_launcher.py:136} INFO - Secret Directory Content
+  total 0
+  lrwxrwxrwx 1 root root 13 Dec 7 15:17 DB_PWD -> ..data/DB_PWD
+  lrwxrwxrwx 1 root root 14 Dec 7 15:17 DB_USER -> ..data/DB_USER
+
+  {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
+  {dagrun.py:444} INFO - Marking run <DagRun example_k8s_secret_volume
+  @ 2020-12-06 00:00:00+00:00: backfill__2020-12-06T00:00:00+00:00, externally triggered: False> successful
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+

Review comment:
       ```suggestion
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: myapp-pvc-rw
+  spec:
+    resources:
+      requests:
+        storage: 20Mi
+    accessModes:
+    - ReadWriteMany
+    storageClassName: ""
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pvc.yaml
+
+
+- Writing and Reading file from Persistent Volume using KubernetesPodOperator
+
+.. code-block:: python
+
+  from kubernetes.client import V1VolumeMount, V1Volume, V1PersistentVolumeClaimVolumeSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_volume", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      myapp_volume = V1Volume(
+          name='myapp-volume',
+          persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='myapp-pvc-rw'))
+
+      myapp_volume_mount = V1VolumeMount(mount_path='/root/myapp', name='myapp-volume')
+
+      task1 = KubernetesPodOperator(task_id='k8s_volume_read_task',
+                                    name='airflow_pod_volume_read',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'date > /root/myapp/date.txt',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task2 = KubernetesPodOperator(task_id='k8s_volume_write_task',
+                                    name='airflow_pod_volume_write',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'echo "Reading date from date.txt : "$(cat /root/myapp/date.txt)',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task1 >> task2
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_read_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_write_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Running
+
+  {pod_launcher.py:136} INFO - Reading date from date.txt : Sat Dec 5 13:58:35 UTC 2020
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+Mounting Secrets as Volume
+==========================
+
+- Example Dag demonstrating use of ``Secret`` class which internally configures ``Volume`` and ``VolumeMount`` for
+  given secret.
+
+.. code-block:: python
+
+  from kubernetes.client import V1Volume, V1SecretVolumeSource, V1VolumeMount
+
+  from airflow import DAG
+  from airflow.kubernetes.secret import Secret
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret_volume", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      secret = Secret('volume', '/etc/my-secret', 'db-credentials')
+      # Is Equal to below two lines
+      # secret_volume = V1Volume(name='my-secret-vol', secret=V1SecretVolumeSource(secret_name='db-credentials'))
+      # secret_volume_mount = V1VolumeMount(mount_path='/etc/my-secret', name='my-secret-vol', read_only=True)
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_volume_task',
+                                    name='airflow_pod_operator_secret_volume',
+                                    namespace='default',
+                                    secrets=[secret, ],
+                                    # secrets is equal to below two lines
+                                    # volumes=[secret_volume, ],
+                                    # volume_mounts=[secret_volume_mount, ],
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Secret Directory Content "$(ls -l /etc/my-secret)'],
+                                    in_cluster=False,
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret_volume
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_volume_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-06T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-06T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-secret-volume-
+  c03db098442b45f2aeb58e2dbca8e78f
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  had an event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  Succeeded
+
+  {pod_launcher.py:136} INFO - Secret Directory Content
+  total 0
+  lrwxrwxrwx 1 root root 13 Dec 7 15:17 DB_PWD -> ..data/DB_PWD
+  lrwxrwxrwx 1 root root 14 Dec 7 15:17 DB_USER -> ..data/DB_USER
+
+  {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
+  {dagrun.py:444} INFO - Marking run <DagRun example_k8s_secret_volume
+  @ 2020-12-06 00:00:00+00:00: backfill__2020-12-06T00:00:00+00:00, externally triggered: False> successful
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+
+Defining Pod resource Requirements and Limits
+=============================================
+
+
+- Defining ResourceRequirements for Pod

Review comment:
       ```suggestion
   This example defines ResourceRequirements for Pod, which allow users to modify CPU, memory, and GPU requests and limits.
   ```

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -82,29 +336,565 @@ equivalent YAML/JSON object spec for the Pod you would like to run.
 The YAML file can still be provided with the ``pod_template_file`` or even the Pod Spec constructed in Python via
 the ``full_pod_spec`` parameter which requires a Kubernetes ``V1Pod``.
 
-How to use private images (container registry)?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-By default, the :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` will
-look for images hosted publicly on Dockerhub.
-To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a
-Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately
-specified in the ``image_pull_secrets`` parameter.
 
-Create the Secret using ``kubectl``:
+Defining Environment Variables for Pod
+======================================
+
 
-.. code-block:: none
+- Creating Task using KubernetesPodOperator with given environment variables.
 
-    kubectl create secret docker-registry testquay \
-        --docker-server=quay.io \
-        --docker-username=<Profile name> \
-        --docker-password=<password>
+.. code-block:: python
 
-Then use it in your pod like so:
+  from kubernetes.client import V1EnvVar
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_private_image]
-    :end-before: [END howto_operator_k8s_private_image]
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_operator_env", start_date=days_ago(1), schedule_interval='@once',
+           tags=["example"]) as dag:
+
+      env_vars = [V1EnvVar(name='TIME_OUT', value='5'), V1EnvVar(name='ENV_TYPE', value='test')]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_env_task',
+                                    name='airflow_pod_operator_env',
+                                    namespace='default',
+                                    env_vars=env_vars,
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator_env
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_env_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 5   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-env-3824c08cb2f04af7928103a027189668 Succeeded
+  ...................................................................................................................
+
+
+
+Accessing ConfigMap
+===========================
+
+
+- YAML file for creating ConfigMap in Kubernetes
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: ConfigMap
+  metadata:
+    name: myapp-config
+  data:
+    TIME_OUT: "15"
+    ENV_TYPE: "test"
+
+
+
+- Creating ConfigMap using ``kubectl`` command
+
+.. code-block:: bash
+
+  $ kubectl apply -f k8s_configmap.yaml
+    configmap/myapp-config created
+
+  $ kubectl describe configmaps myapp-config
+    Name:         myapp-config
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Data
+    ====
+    TIME_OUT:
+    ----
+    15
+    ENV_TYPE:
+    ----
+    test
+    Events:  <none>
+
+
+
+- Accessing variables from ConfigMap inside the Pod
+
+.. code-block:: python
+
+  from kubernetes.client import V1ConfigMapEnvSource, V1EnvFromSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_config_map", start_date=days_ago(1),
+          schedule_interval='@once', tags=["example"]) as dag:
+
+      config_map = [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name='myapp-config')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_config_map_task',
+                                    name='airflow_pod_operator_config_map',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=config_map,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables TIME_OUT : $TIME_OUT   ENV_TYPE :'
+                                          ' $ENV_TYPE"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_config_map
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_config_map_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+  {pod_launcher.py:136} INFO - Reading environment variables TIME_OUT : 15   ENV_TYPE : test
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-config-map-d472d9495b0741bc84e34c11d09c66fe Succeeded
+
+
+
+Accessing Secrets
+=================
+
+
+- Creating Secrets
+
+.. code-block:: bash
+
+  $ echo -n 'root' > DB_USER
+  $ echo -n 'ent3r$ce@d00r' > DB_PWD
+
+  $ kubectl create secret generic db-credentials --from-file=DB_USER  --from-file=DB_PWD
+
+  $ kubectl describe secrets db-credentials
+
+    Name:         db-credentials
+    Namespace:    default
+    Labels:       <none>
+    Annotations:  <none>
+
+    Type:  Opaque
+
+    Data
+    ====
+    DB_PWD:   13 bytes
+    DB_USER:  4 bytes
+
+
+- Accessing secret inside pod as environment variable
+
+.. code-block:: python
+
+  from kubernetes.client import V1EnvFromSource, V1SecretEnvSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      db_credentials = [V1EnvFromSource(secret_ref=V1SecretEnvSource(name='db-credentials')), ]
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_task',
+                                    name='airflow_pod_operator_secret',
+                                    namespace='default',
+                                    image='alpine',
+                                    env_from=db_credentials,
+                                    cmds=["sh", "-c",
+                                          'echo "Reading environment variables DB_USER: : $DB_USER:   DB_PWD :'
+                                          ' $DB_PWD"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+   {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+
+  {pod_launcher.py:136} INFO - Reading environment variables DB_USER: : root:   DB_PWD : ent3r$ce@d00r
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-3f7d6b3e5dcf4673aa1f584e26f1d012 Succeeded
+
+
+
+Mounting Persistent  Volume to Pod
+==================================
+
+- Creating PersistentVolume
+
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolume
+  metadata:
+    name: myapp-pv
+  spec:
+    capacity:
+      storage: 20Mi
+    accessModes:
+      - ReadWriteMany
+    persistentVolumeReclaimPolicy: Retain
+    hostPath:
+      path: /tmp/myapp
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pv.yaml
+
+
+- Creating PersistentVolumeClaim
+
+.. code-block:: yaml
+
+  apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: myapp-pvc-rw
+  spec:
+    resources:
+      requests:
+        storage: 20Mi
+    accessModes:
+    - ReadWriteMany
+    storageClassName: ""
+
+
+.. code-block:: bash
+
+  $ kubectl apply -f myapp_pvc.yaml
+
+
+- Writing and Reading file from Persistent Volume using KubernetesPodOperator
+
+.. code-block:: python
+
+  from kubernetes.client import V1VolumeMount, V1Volume, V1PersistentVolumeClaimVolumeSource
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_volume", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      myapp_volume = V1Volume(
+          name='myapp-volume',
+          persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name='myapp-pvc-rw'))
+
+      myapp_volume_mount = V1VolumeMount(mount_path='/root/myapp', name='myapp-volume')
+
+      task1 = KubernetesPodOperator(task_id='k8s_volume_read_task',
+                                    name='airflow_pod_volume_read',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'date > /root/myapp/date.txt',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task2 = KubernetesPodOperator(task_id='k8s_volume_write_task',
+                                    name='airflow_pod_volume_write',
+                                    namespace='default',
+                                    image='alpine',
+                                    volumes=[myapp_volume, ],
+                                    volume_mounts=[myapp_volume_mount, ],
+                                    cmds=["sh", "-c",
+                                          'echo "Reading date from date.txt : "$(cat /root/myapp/date.txt)',
+                                          ],
+                                    startup_timeout_seconds=60,
+                                    )
+
+      task1 >> task2
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_read_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 Succeeded
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-read-7055ebbfe703448ba6e8ba35487265e3 had an
+  event of type Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_volume
+  AIRFLOW_CTX_TASK_ID=k8s_volume_write_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-04T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-04T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an
+  event of type Running
+
+  {pod_launcher.py:136} INFO - Reading date from date.txt : Sat Dec 5 13:58:35 UTC 2020
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c had an event
+  of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-volume-write-23495d6738994e1d96765dfef49f345c Succeeded
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+Mounting Secrets as Volume
+==========================
+
+- Example Dag demonstrating use of ``Secret`` class which internally configures ``Volume`` and ``VolumeMount`` for
+  given secret.
+
+.. code-block:: python
+
+  from kubernetes.client import V1Volume, V1SecretVolumeSource, V1VolumeMount
+
+  from airflow import DAG
+  from airflow.kubernetes.secret import Secret
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_secret_volume", start_date=days_ago(1), schedule_interval='@once', tags=["example"]) as dag:
+
+      secret = Secret('volume', '/etc/my-secret', 'db-credentials')
+      # Is Equal to below two lines
+      # secret_volume = V1Volume(name='my-secret-vol', secret=V1SecretVolumeSource(secret_name='db-credentials'))
+      # secret_volume_mount = V1VolumeMount(mount_path='/etc/my-secret', name='my-secret-vol', read_only=True)
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_secret_volume_task',
+                                    name='airflow_pod_operator_secret_volume',
+                                    namespace='default',
+                                    secrets=[secret, ],
+                                    # secrets is equal to below two lines
+                                    # volumes=[secret_volume, ],
+                                    # volume_mounts=[secret_volume_mount, ],
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Secret Directory Content "$(ls -l /etc/my-secret)'],
+                                    in_cluster=False,
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_secret_volume
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_secret_volume_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-06T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-06T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f had
+  an event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-secret-volume-
+  c03db098442b45f2aeb58e2dbca8e78f
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  had an event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-secret-volume-c03db098442b45f2aeb58e2dbca8e78f
+  Succeeded
+
+  {pod_launcher.py:136} INFO - Secret Directory Content
+  total 0
+  lrwxrwxrwx 1 root root 13 Dec 7 15:17 DB_PWD -> ..data/DB_PWD
+  lrwxrwxrwx 1 root root 14 Dec 7 15:17 DB_USER -> ..data/DB_USER
+
+  {taskinstance.py:1195} INFO - 0 downstream tasks scheduled from follow-on schedule check
+  {dagrun.py:444} INFO - Marking run <DagRun example_k8s_secret_volume
+  @ 2020-12-06 00:00:00+00:00: backfill__2020-12-06T00:00:00+00:00, externally triggered: False> successful
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+
+Defining Pod resource Requirements and Limits
+=============================================
+
+
+- Defining ResourceRequirements for Pod
+
+
+.. code-block:: python
+
+  from kubernetes.client import V1ResourceRequirements
+
+  from airflow import DAG
+  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+  from airflow.utils.dates import days_ago
+
+  with DAG(dag_id="example_k8s_limit_resource", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+
+      resource_req = V1ResourceRequirements(
+                              requests={
+                                  "cpu": 1,
+                                  'memory': '100Mi'
+                              },
+                              limits={
+                                  "cpu": 2,
+                                  'memory': '200Mi',
+                              }
+      )
+
+      task1 = KubernetesPodOperator(task_id='k8s_pod_limit_resource_task',
+                                    name='airflow_pod_limit_resource',
+                                    namespace='default',
+                                    image='alpine',
+                                    resources=resource_req,
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+
+- After executing / debugging example and checking the logs
+
+.. code-block:: bash
+
+  INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_limit_resource
+  AIRFLOW_CTX_TASK_ID=k8s_pod_limit_resource_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-05T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-05T00:00:00+00:00
+  {pod_launcher.py:176} INFO - Event: airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6 had an
+  event of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6
+  {pod_launcher.py:176} INFO - Event: airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6 had an
+  event of type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6 Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6]
+
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_limit_resource,task_id=
+  k8s_pod_limit_resource_task, execution_date=20201205T000000, start_date=20201206T035456, end_date=20201206T035517
+  {backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 |
+  running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
+  {backfill_job.py:830} INFO - Backfill done. Exiting.
+
+
+- Describing Pod configuration
+
+.. code-block:: bash
+
+  $ kubectl describe pod airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6
+
+    Name:         airflow-pod-limit-resource-ac4c107dd10549c89a2015f976e729d6
+    Namespace:    default
+    Priority:     0
+    Node:         minikube/192.168.49.2
+    Start Time:   Sun, 06 Dec 2020 09:25:02 +0530
+    Labels:       airflow_version=2.0.0b2
+                  dag_id=example_k8s_limit_resource
+                  execution_date=2020-12-05T0000000000-c71846343
+                  kubernetes_pod_operator=True
+                  task_id=k8s_pod_limit_resource_task
+                  try_number=1
+    Annotations:  <none>
+    Status:       Succeeded
+    IP:           172.17.0.7
+    IPs:
+      IP:  172.17.0.7
+    Containers:
+      base:
+        Container ID:  docker://819ae8713aefb51b4f9bbdd7567adf706ebcee402418e1c4ef358c0bab90bc8b
+        Image:         alpine
+        Image ID:      docker-pullable://alpine@sha256:c0e9560cda118f9ec63ddefb4a173a2b2a0347082d7dff7dc14272e7841a5b5a
+        Port:          <none>
+        Host Port:     <none>
+        Command:
+          sh
+          -c
+          echo "Hello World from pod [$HOSTNAME]"
+        State:          Terminated
+          Reason:       Completed
+          Exit Code:    0
+          Started:      Sun, 06 Dec 2020 09:25:14 +0530
+          Finished:     Sun, 06 Dec 2020 09:25:14 +0530
+        Ready:          False
+        Restart Count:  0
+
+        Limits:
+          cpu:     2
+          memory:  200Mi
+        Requests:
+          cpu:        1
+          memory:     100Mi
+
+        Environment:  <none>
+        Mounts:
+          /var/run/secrets/kubernetes.io/serviceaccount from default-token-ltgdm (ro)
+    ..................................................................................
+
+
+
+

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#issuecomment-763344070


   Hi @dimberman ,
   Thank you for reviewing PR>
   I have added text from previous document and made changes suggested in review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r570493888



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -115,14 +905,169 @@ alongside the Pod. The Pod must write the XCom value into this location at the `
 
 See the following example on how this occurs:
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_write_xcom]
-    :end-before: [END howto_operator_k8s_write_xcom]
 
-Reference
-^^^^^^^^^
-For further information, look at:
+- Example Dag : KubernetesPodOperator task writes contents to be returned to ``/airflow/xcom/return.json`` and reading
+  values returned using ``xcom_pull(key, task_ids)``.
+
+.. code-block:: python

Review comment:
       Can you please move these code segments into example_dag files or some other external file and pull in using tags? That way we can re-use these code snippets later and have python linting when editing them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#issuecomment-813406151


   @VBhojawala can you take a look at the failing doc build?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r570493888



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -115,14 +905,169 @@ alongside the Pod. The Pod must write the XCom value into this location at the `
 
 See the following example on how this occurs:
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_write_xcom]
-    :end-before: [END howto_operator_k8s_write_xcom]
 
-Reference
-^^^^^^^^^
-For further information, look at:
+- Example Dag : KubernetesPodOperator task writes contents to be returned to ``/airflow/xcom/return.json`` and reading
+  values returned using ``xcom_pull(key, task_ids)``.
+
+.. code-block:: python

Review comment:
       Can you please move these code segments into example_dag files or some other external file and pull in using tags? That way we can re-use these code snippets later and have python linting when editing them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r570704372



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -115,14 +905,169 @@ alongside the Pod. The Pod must write the XCom value into this location at the `
 
 See the following example on how this occurs:
 
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_write_xcom]
-    :end-before: [END howto_operator_k8s_write_xcom]
 
-Reference
-^^^^^^^^^
-For further information, look at:
+- Example Dag : KubernetesPodOperator task writes contents to be returned to ``/airflow/xcom/return.json`` and reading
+  values returned using ``xcom_pull(key, task_ids)``.
+
+.. code-block:: python

Review comment:
       I have extracted example and tagged in doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r560684026



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to
+  KubernetesPodOperator constructor when running examples with minikube.
+
+
+.. code-block:: python
+
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+- Executing / Debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................
+
+- Getting kubernetes pods using labels ``dag_id`` and ``task_id`` automatically assigned by Airflow and Describing it.
+
+.. code-block:: bash
+
+  $ kubectl get pods -l dag_id=example_k8s_operator,task_id=k8s_pod_operator_task
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` allows
-you to create and run Pods on a Kubernetes cluster.
+    NAME                                                    READY   STATUS      RESTARTS   AGE
+    airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c   0/1     Completed   0          14m
 
-.. contents::
-  :depth: 1
-  :local:
+  $ kubectl describe pod airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
 
-.. note::
-  If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, consider
-  using the
-  :ref:`GKEStartPodOperator <howto/operator:GKEStartPodOperator>` operator as it
-  simplifies the Kubernetes authorization process.
+    Name:         airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+    Namespace:    default
+    Priority:     0
+    Node:         minikube/192.168.49.2
+    Start Time:   Fri, 04 Dec 2020 19:33:36 +0530
+    Labels:       airflow_version=2.0.0b2
+                  dag_id=example_k8s_operator
+                  execution_date=2020-12-03T0000000000-767fcb862
+                  kubernetes_pod_operator=True
+                  task_id=k8s_pod_operator_task
+                  try_number=1
+    Annotations:  <none>
+    Status:       Succeeded
+    IP:           172.17.0.7
+    IPs:
+      IP:  172.17.0.7
+    Containers:
+      base:
+        Container ID:  docker://56c91324dc925b0bad0d60474e35d8c7eb7fad7d8410ca123b657f1416207504
+        Image:         alpine
+        Image ID:      docker-pullable://alpine@sha256:c0e9560cda118f9ec63ddefb4a173a2b2a0347082d7dff7dc14272e7841a5b5a
+        Port:          <none>
+        Host Port:     <none>
+        Command:
+          sh
+          -c
+          echo "Hello World from pod [$HOSTNAME]"
+        State:          Terminated
+          Reason:       Completed
+          Exit Code:    0
+          Started:      Fri, 04 Dec 2020 19:33:43 +0530
+          Finished:     Fri, 04 Dec 2020 19:33:43 +0530
+        Ready:          False
+        Restart Count:  0
+        Environment:    <none>
+        Mounts:
+          /var/run/secrets/kubernetes.io/serviceaccount from default-token-ltgdm (ro)
+    Conditions:
+      Type              Status
+      Initialized       True
+      Ready             False
+      ContainersReady   False
+      PodScheduled      True
+    Volumes:
+      default-token-ltgdm:
+        Type:        Secret (a volume populated by a Secret)
+        SecretName:  default-token-ltgdm
+        Optional:    false
+    QoS Class:       BestEffort
+    Node-Selectors:  <none>
+    Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
+                     node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
+    Events:
+      Type    Reason     Age   From               Message
+      ----    ------     ----  ----               -------
+      Normal  Scheduled  15m   default-scheduler  Successfully assigned default/airflow-pod-operator
+                                                  -aed97ecd64854367ad7d0ff39f37859c to minikube
+      Normal  Pulling    15m   kubelet            Pulling image "alpine"
+      Normal  Pulled     15m   kubelet            Successfully pulled image "alpine" in 4.214686688s
+      Normal  Created    15m   kubelet            Created container base
+      Normal  Started    15m   kubelet            Started container base
 
-.. note::
-  The :doc:`Kubernetes executor <apache-airflow:executor/kubernetes>` is **not** required to use this operator.
 
-How does this operator work?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` uses the
-Kubernetes API to launch a pod in a Kubernetes cluster. By supplying an
-image URL and a command with optional arguments, the operator uses the Kube Python Client to generate a Kubernetes API
-request that dynamically launches those individual pods.
-Users can specify a kubeconfig file using the ``config_file`` parameter, otherwise the operator will default
-to ``~/.kube/config``.
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` enables task-level
-resource configuration and is optimal for custom Python
-dependencies that are not available through the public PyPI repository. It also allows users to supply a template
-YAML file using the ``pod_template_file`` parameter.
-Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
+Defining Environment Variables for Pod
+======================================
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:
+- Creating Task using KubernetesPodOperator with given environment variables.
 
 .. code-block:: python
 
-  from kubernetes.client import models as k8s
-
-With this API object, you can have access to all Kubernetes API objects in the form of python classes.
-Using this method will ensure correctness
-and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
-:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.
-
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_cluster_resources]
-    :end-before: [END howto_operator_k8s_cluster_resources]
-
-Difference between ``KubernetesPodOperator`` and Kubernetes object spec
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Review comment:
       I will added text description from old doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13405:
URL: https://github.com/apache/airflow/pull/13405


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#issuecomment-761757642


   Hi @dimberman,
   Kindly  review the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#issuecomment-784052822


   Hi @dimberman,
   Kindly review the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r560459900



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to

Review comment:
       Hi Vivek, could you put examples of using pod_template file and pod_template_spec on top? Both of those could make this operator a lot easier to use.

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to
+  KubernetesPodOperator constructor when running examples with minikube.
+
+
+.. code-block:: python
+
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+- Executing / Debugging example and checking the logs

Review comment:
       Is there something you were going to write here? I just see logs...

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to
+  KubernetesPodOperator constructor when running examples with minikube.
+
+
+.. code-block:: python
+
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+- Executing / Debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................
+
+- Getting kubernetes pods using labels ``dag_id`` and ``task_id`` automatically assigned by Airflow and Describing it.
+
+.. code-block:: bash
+
+  $ kubectl get pods -l dag_id=example_k8s_operator,task_id=k8s_pod_operator_task

Review comment:
       good catch on this one!

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to
+  KubernetesPodOperator constructor when running examples with minikube.

Review comment:
       I'd recommend we push people towards KinD instead of minikube.

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to
+  KubernetesPodOperator constructor when running examples with minikube.
+
+
+.. code-block:: python
+
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+- Executing / Debugging example and checking the logs
+
+.. code-block:: bash
+
+  {taskinstance.py:1230} INFO - Exporting the following env vars:
+  AIRFLOW_CTX_DAG_OWNER=airflow
+  AIRFLOW_CTX_DAG_ID=example_k8s_operator
+  AIRFLOW_CTX_TASK_ID=k8s_pod_operator_task
+  AIRFLOW_CTX_EXECUTION_DATE=2020-12-03T00:00:00+00:00
+  AIRFLOW_CTX_DAG_RUN_ID=backfill__2020-12-03T00:00:00+00:00
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event
+  of type Pending
+  {pod_launcher.py:113} WARNING - Pod not yet started: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of type
+  Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+
+  {pod_launcher.py:136} INFO - Hello World from pod [airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c]
+
+  {pod_launcher.py:176} INFO - Event: airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c had an event of
+  type Succeeded
+  {pod_launcher.py:289} INFO - Event with job id airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c Succeeded
+  {taskinstance.py:1136} INFO - Marking task as SUCCESS. dag_id=example_k8s_operator, task_id=k8s_pod_operator_task,
+  execution_date=20201203T000000, start_date=20201204T140331, end_date=20201204T140345
+  ................................................................................................................
+
+- Getting kubernetes pods using labels ``dag_id`` and ``task_id`` automatically assigned by Airflow and Describing it.
+
+.. code-block:: bash
+
+  $ kubectl get pods -l dag_id=example_k8s_operator,task_id=k8s_pod_operator_task
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` allows
-you to create and run Pods on a Kubernetes cluster.
+    NAME                                                    READY   STATUS      RESTARTS   AGE
+    airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c   0/1     Completed   0          14m
 
-.. contents::
-  :depth: 1
-  :local:
+  $ kubectl describe pod airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
 
-.. note::
-  If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, consider
-  using the
-  :ref:`GKEStartPodOperator <howto/operator:GKEStartPodOperator>` operator as it
-  simplifies the Kubernetes authorization process.
+    Name:         airflow-pod-operator-aed97ecd64854367ad7d0ff39f37859c
+    Namespace:    default
+    Priority:     0
+    Node:         minikube/192.168.49.2
+    Start Time:   Fri, 04 Dec 2020 19:33:36 +0530
+    Labels:       airflow_version=2.0.0b2
+                  dag_id=example_k8s_operator
+                  execution_date=2020-12-03T0000000000-767fcb862
+                  kubernetes_pod_operator=True
+                  task_id=k8s_pod_operator_task
+                  try_number=1
+    Annotations:  <none>
+    Status:       Succeeded
+    IP:           172.17.0.7
+    IPs:
+      IP:  172.17.0.7
+    Containers:
+      base:
+        Container ID:  docker://56c91324dc925b0bad0d60474e35d8c7eb7fad7d8410ca123b657f1416207504
+        Image:         alpine
+        Image ID:      docker-pullable://alpine@sha256:c0e9560cda118f9ec63ddefb4a173a2b2a0347082d7dff7dc14272e7841a5b5a
+        Port:          <none>
+        Host Port:     <none>
+        Command:
+          sh
+          -c
+          echo "Hello World from pod [$HOSTNAME]"
+        State:          Terminated
+          Reason:       Completed
+          Exit Code:    0
+          Started:      Fri, 04 Dec 2020 19:33:43 +0530
+          Finished:     Fri, 04 Dec 2020 19:33:43 +0530
+        Ready:          False
+        Restart Count:  0
+        Environment:    <none>
+        Mounts:
+          /var/run/secrets/kubernetes.io/serviceaccount from default-token-ltgdm (ro)
+    Conditions:
+      Type              Status
+      Initialized       True
+      Ready             False
+      ContainersReady   False
+      PodScheduled      True
+    Volumes:
+      default-token-ltgdm:
+        Type:        Secret (a volume populated by a Secret)
+        SecretName:  default-token-ltgdm
+        Optional:    false
+    QoS Class:       BestEffort
+    Node-Selectors:  <none>
+    Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
+                     node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
+    Events:
+      Type    Reason     Age   From               Message
+      ----    ------     ----  ----               -------
+      Normal  Scheduled  15m   default-scheduler  Successfully assigned default/airflow-pod-operator
+                                                  -aed97ecd64854367ad7d0ff39f37859c to minikube
+      Normal  Pulling    15m   kubelet            Pulling image "alpine"
+      Normal  Pulled     15m   kubelet            Successfully pulled image "alpine" in 4.214686688s
+      Normal  Created    15m   kubelet            Created container base
+      Normal  Started    15m   kubelet            Started container base
 
-.. note::
-  The :doc:`Kubernetes executor <apache-airflow:executor/kubernetes>` is **not** required to use this operator.
 
-How does this operator work?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` uses the
-Kubernetes API to launch a pod in a Kubernetes cluster. By supplying an
-image URL and a command with optional arguments, the operator uses the Kube Python Client to generate a Kubernetes API
-request that dynamically launches those individual pods.
-Users can specify a kubeconfig file using the ``config_file`` parameter, otherwise the operator will default
-to ``~/.kube/config``.
 
-The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` enables task-level
-resource configuration and is optimal for custom Python
-dependencies that are not available through the public PyPI repository. It also allows users to supply a template
-YAML file using the ``pod_template_file`` parameter.
-Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
+Defining Environment Variables for Pod
+======================================
 
-How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API
-like this:
+- Creating Task using KubernetesPodOperator with given environment variables.
 
 .. code-block:: python
 
-  from kubernetes.client import models as k8s
-
-With this API object, you can have access to all Kubernetes API objects in the form of python classes.
-Using this method will ensure correctness
-and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the
-:class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables.
-
-.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
-    :language: python
-    :start-after: [START howto_operator_k8s_cluster_resources]
-    :end-before: [END howto_operator_k8s_cluster_resources]
-
-Difference between ``KubernetesPodOperator`` and Kubernetes object spec
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Review comment:
       Why are you getting rid of this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #13405: KubernetesPodOperator Guide

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #13405:
URL: https://github.com/apache/airflow/pull/13405#discussion_r560683866



##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to

Review comment:
       Ok , I will put that example of top.

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to
+  KubernetesPodOperator constructor when running examples with minikube.

Review comment:
       ok.

##########
File path: docs/apache-airflow-providers-cncf-kubernetes/operators.rst
##########
@@ -15,114 +15,1015 @@
     specific language governing permissions and limitations
     under the License.
 
+###########################
+KubernetesPodOperator Guide
+###########################
+
+.. contents:: :local:
 
 
 .. _howto/operator:KubernetesPodOperator:
 
-KubernetesPodOperator
-=====================
+Kubernetes Pod Operator "Hello World!"
+======================================
+
+- Creating Task using KubernetesPodOperator which prints "Hello World [HOSTNAME]".  Add ``in_cluster=False`` to
+  KubernetesPodOperator constructor when running examples with minikube.
+
+
+.. code-block:: python
+
+  with DAG(dag_id="example_k8s_operator", start_date=days_ago(1),
+           schedule_interval='@once', tags=["example"]) as dag:
+      task1 = KubernetesPodOperator(task_id='k8s_pod_operator_task',
+                                    name='airflow_pod_operator',
+                                    namespace='default',
+                                    image='alpine',
+                                    cmds=["sh", "-c",
+                                          'echo "Hello World from pod [$HOSTNAME]"'],
+                                    startup_timeout_seconds=60,
+                                    )
+
+- Executing / Debugging example and checking the logs

Review comment:
       I meant after executing the dag with configuration, i will update the line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org