You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/10/14 07:29:27 UTC

[pulsar] 01/04: [Security] Upgrade the snakeyaml verion to 1.26 (#7994)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c263c6c89bc7f51bb2ba22b21e19035a833eaaaa
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Tue Sep 8 09:05:23 2020 +0800

    [Security] Upgrade the snakeyaml verion to 1.26 (#7994)
    
    Fixes #7928
    
    As https://nvd.nist.gov/vuln/detail/CVE-2017-18640 said, the `snakeyaml` < 1.26
    
    In `pulsar-functions` model:
    
    - The `snakeyaml` 1.19 appears to be included from dependency on org.apache.pulsar:pulsar-functions-secrets:jar:2.6.1 based on included dependency of io.kubernetes:client-java-api:jar:2.0.0:compile Fixed in 9.0.2
    
    - The `snakeyaml` 1.16 appears to be included from the dependency on org.apache.pulsar:pulsar-functions-instance:jar:2.6.1 based on io.prometheus.jmx:collector:jar:0.12.0 Fixed in 0.13.0
    
    - The 1.17 org.apache.pulsar.tests:integration:test-jar:tests:2.6.1:test depends on org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:6.3.2:test Fixed in elasticsearch >= 7.7.1 (7.9.1 current)
    
    (cherry picked from commit 5adbc1f99958a09fc1cd5191fbe2ff76a70b4c85)
---
 distribution/server/src/assemble/LICENSE.bin.txt   | 23 ++++++---
 pom.xml                                            |  4 +-
 pulsar-functions/runtime/pom.xml                   |  2 +-
 .../auth/KubernetesFunctionAuthProvider.java       |  4 +-
 .../auth/KubernetesSecretsTokenAuthProvider.java   | 34 ++++++-------
 .../BasicKubernetesManifestCustomizer.java         |  2 +-
 .../kubernetes/KubernetesManifestCustomizer.java   |  4 +-
 .../runtime/kubernetes/KubernetesRuntime.java      | 59 +++++++++++-----------
 .../kubernetes/KubernetesRuntimeFactory.java       | 10 ++--
 .../KubernetesSecretsTokenAuthProviderTest.java    | 18 +++----
 .../kubernetes/KubernetesRuntimeFactoryTest.java   | 10 ++--
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  6 +--
 .../runtime/process/ProcessRuntimeTest.java        |  6 +--
 pulsar-functions/secrets/pom.xml                   |  2 +-
 .../DefaultSecretsProviderConfigurator.java        |  2 +-
 .../KubernetesSecretsProviderConfigurator.java     | 10 ++--
 .../SecretsProviderConfigurator.java               |  6 +--
 .../pulsar/io/elasticsearch/ElasticSearchSink.java | 11 ++--
 .../integration/io/ElasticSearchSinkTester.java    |  5 +-
 19 files changed, 113 insertions(+), 105 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 17cfffe..632c604 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -322,7 +322,16 @@ The Apache Software License, Version 2.0
      - com.fasterxml.jackson.module-jackson-module-jsonSchema-2.11.1.jar
  * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.6.2.jar
  * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-1.12.0.jar
- * Gson -- com.google.code.gson-gson-2.8.2.jar
+ * Joda -- org.joda-joda-convert-2.2.1.jar
+ * Bitbucket -- org.bitbucket.b_c-jose4j-0.7.2.jar
+ * Gson
+    - com.google.code.gson-gson-2.8.2.jar
+    - io.gsonfire-gson-fire-1.8.4.jar
+ * Sundrio
+    - io.sundr-builder-annotations-0.21.0.jar
+    - io.sundr-resourcecify-annotations-0.21.0.jar
+    - io.sundr-sundr-codegen-0.21.0.jar
+    - io.sundr-sundr-core-0.21.0.jar
  * Guava
     - com.google.guava-guava-25.1-jre.jar
  * J2ObjC Annotations -- com.google.j2objc-j2objc-annotations-1.1.jar
@@ -371,7 +380,7 @@ The Apache Software License, Version 2.0
     - io.prometheus-simpleclient_servlet-0.5.0.jar
     - io.prometheus-simpleclient_log4j2-0.5.0.jar
     - io.prometheus-simpleclient_jetty-0.5.0.jar
-    - io.prometheus.jmx-collector-0.12.0.jar
+    - io.prometheus.jmx-collector-0.14.0.jar
     - io.prometheus-simpleclient_caffeine-0.5.0.jar
  * Bean Validation API -- javax.validation-validation-api-1.1.0.Final.jar
  * Log4J
@@ -469,12 +478,12 @@ The Apache Software License, Version 2.0
   * @FreeBuilder
     - org.inferred-freebuilder-1.14.9.jar
   * Squareup
-    - com.squareup.okhttp-logging-interceptor-2.7.5.jar
-    - com.squareup.okhttp-okhttp-ws-2.7.5.jar
+    - com.squareup.okhttp3-logging-interceptor-3.14.3.jar
+    - com.squareup.okhttp3-okhttp-3.14.3.jar
   * Kubernetes Client
-    - io.kubernetes-client-java-2.0.0.jar
-    - io.kubernetes-client-java-api-2.0.0.jar
-    - io.kubernetes-client-java-proto-2.0.0.jar
+    - io.kubernetes-client-java-9.0.2.jar
+    - io.kubernetes-client-java-api-9.0.2.jar
+    - io.kubernetes-client-java-proto-9.0.2.jar
   * Joda Time
     - joda-time-2.10.1.jar
     - joda-time-joda-time-2.10.1.jar
diff --git a/pom.xml b/pom.xml
index 167aa3e..a984502 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,7 +202,7 @@ flexible messaging model and an intuitive client API.</description>
     <hdfs-offload-version3>3.2.0</hdfs-offload-version3>
     <org.eclipse.jetty-hdfs-offload>9.3.24.v20180605</org.eclipse.jetty-hdfs-offload>
     <test-hdfs-offload-jetty>9.3.24.v20180605</test-hdfs-offload-jetty>
-    <elasticsearch.version>6.3.2</elasticsearch.version>
+    <elasticsearch.version>7.9.1</elasticsearch.version>
     <presto.version>332</presto.version>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
@@ -212,7 +212,7 @@ flexible messaging model and an intuitive client API.</description>
     <hbase.version>1.4.9</hbase.version>
     <guava.version>25.1-jre</guava.version>
     <jcip.version>1.0</jcip.version>
-    <prometheus-jmx.version>0.12.0</prometheus-jmx.version>
+    <prometheus-jmx.version>0.14.0</prometheus-jmx.version>
     <confluent.version>5.3.2</confluent.version>
     <kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
     <aircompressor.version>0.16</aircompressor.version>
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 691f178..5e81f5e 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -63,7 +63,7 @@
     <dependency>
       <groupId>io.kubernetes</groupId>
       <artifactId>client-java</artifactId>
-      <version>2.0.0</version>
+      <version>9.0.2</version>
       <scope>compile</scope>
       <exclusions>
         <exclusion>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
index dacd3a1..912f0d3 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.functions.auth;
 
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1StatefulSet;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.common.util.Reflections;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index 7119c74..98b1cfa 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -19,16 +19,16 @@
 package org.apache.pulsar.functions.auth;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.kubernetes.client.ApiException;
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1DeleteOptions;
-import io.kubernetes.client.models.V1ObjectMeta;
-import io.kubernetes.client.models.V1PodSpec;
-import io.kubernetes.client.models.V1Secret;
-import io.kubernetes.client.models.V1SecretVolumeSource;
-import io.kubernetes.client.models.V1StatefulSet;
-import io.kubernetes.client.models.V1Volume;
-import io.kubernetes.client.models.V1VolumeMount;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1DeleteOptions;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1Secret;
+import io.kubernetes.client.openapi.models.V1SecretVolumeSource;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -173,16 +173,12 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
                 .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
                 .supplier(() -> {
                     try {
-                        V1DeleteOptions v1DeleteOptions = new V1DeleteOptions();
-                        v1DeleteOptions.setGracePeriodSeconds(0L);
-                        v1DeleteOptions.setPropagationPolicy("Foreground");
-
                         // make sure secretName is not null or empty string.
                         // If deleteNamespacedSecret is called and secret name is null or empty string
                         // it will delete all the secrets in the namespace
                         coreClient.deleteNamespacedSecret(secretName,
-                                kubeNamespace, v1DeleteOptions, "true",
-                                null, null, null);
+                                kubeNamespace, null, "true",
+                                0, null, "Foreground", null);
                     } catch (ApiException e) {
                         // if already deleted
                         if (e.getCode() == HTTP_NOT_FOUND) {
@@ -302,11 +298,11 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
                             .data(buildSecretMap(token));
 
                     try {
-                        coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null);
+                        coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, "true", null);
                     } catch (ApiException e) {
                         if (e.getCode() == HTTP_CONFLICT) {
                             try {
-                                coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret, null);
+                                coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret, null, "true", null);
                                 return Actions.ActionResult.builder().success(true).build();
 
                             } catch (ApiException e1) {
@@ -358,7 +354,7 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
                             .metadata(new V1ObjectMeta().name(getSecretName(id)))
                             .data(buildSecretMap(token));
                     try {
-                        coreClient.createNamespacedSecret(kubeNamespace, v1Secret, "true");
+                        coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, "true", null);
                     } catch (ApiException e) {
                         // already exists
                         if (e.getCode() == HTTP_CONFLICT) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
index 714d1d5..dc14b6a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.functions.runtime.kubernetes;
 
 import com.google.gson.Gson;
 import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.models.*;
+import io.kubernetes.client.openapi.models.*;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
index b389cf6..fb69dfa 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.functions.runtime.kubernetes;
 
-import io.kubernetes.client.models.V1Service;
-import io.kubernetes.client.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 56f6aaf..aff638c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -26,33 +26,34 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gson.Gson;
 import com.google.protobuf.Empty;
-import com.squareup.okhttp.Response;
+
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import io.kubernetes.client.ApiException;
-import io.kubernetes.client.apis.AppsV1Api;
-import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.models.V1Container;
-import io.kubernetes.client.models.V1ContainerPort;
-import io.kubernetes.client.models.V1DeleteOptions;
-import io.kubernetes.client.models.V1EnvVar;
-import io.kubernetes.client.models.V1EnvVarSource;
-import io.kubernetes.client.models.V1LabelSelector;
-import io.kubernetes.client.models.V1ObjectFieldSelector;
-import io.kubernetes.client.models.V1ObjectMeta;
-import io.kubernetes.client.models.V1PodList;
-import io.kubernetes.client.models.V1PodSpec;
-import io.kubernetes.client.models.V1PodTemplateSpec;
-import io.kubernetes.client.models.V1ResourceRequirements;
-import io.kubernetes.client.models.V1Service;
-import io.kubernetes.client.models.V1ServicePort;
-import io.kubernetes.client.models.V1ServiceSpec;
-import io.kubernetes.client.models.V1StatefulSet;
-import io.kubernetes.client.models.V1StatefulSetSpec;
-import io.kubernetes.client.models.V1Toleration;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1DeleteOptions;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1LabelSelector;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServicePort;
+import io.kubernetes.client.openapi.models.V1ServiceSpec;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Toleration;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import okhttp3.Response;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -418,7 +419,7 @@ public class KubernetesRuntime implements Runtime {
                 .supplier(() -> {
                     final V1Service response;
                     try {
-                        response = coreClient.createNamespacedService(jobNamespace, service, null);
+                        response = coreClient.createNamespacedService(jobNamespace, service, null, "true", null);
                     } catch (ApiException e) {
                         // already exists
                         if (e.getCode() == HTTP_CONFLICT) {
@@ -503,7 +504,7 @@ public class KubernetesRuntime implements Runtime {
                 .supplier(() -> {
                     final V1StatefulSet response;
                     try {
-                        response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet, null);
+                        response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet, null, "true", null);
                     } catch (ApiException e) {
                         // already exists
                         if (e.getCode() == HTTP_CONFLICT) {
@@ -554,8 +555,8 @@ public class KubernetesRuntime implements Runtime {
                         // https://github.com/kubernetes-client/java/issues/86
                         response = appsClient.deleteNamespacedStatefulSetCall(
                                 statefulSetName,
-                                jobNamespace, options, null,
-                                null, null, null,
+                                jobNamespace, null, "true",
+                                5, null, "Foreground",
                                 null, null)
                                 .execute();
                     } catch (ApiException e) {
@@ -705,9 +706,9 @@ public class KubernetesRuntime implements Runtime {
                         // https://github.com/kubernetes-client/java/issues/86
                         response = coreClient.deleteNamespacedServiceCall(
                                 serviceName,
-                                jobNamespace, options, null,
-                                null, null,
-                                null, null, null).execute();
+                                jobNamespace, null, "true",
+                                0, null,
+                                "Foreground", null, null).execute();
                     } catch (ApiException e) {
                         // if already deleted
                         if (e.getCode() == HTTP_NOT_FOUND) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 6414783..c864820 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -20,11 +20,11 @@
 package org.apache.pulsar.functions.runtime.kubernetes;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.kubernetes.client.ApiClient;
-import io.kubernetes.client.Configuration;
-import io.kubernetes.client.apis.AppsV1Api;
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1ConfigMap;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.Configuration;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
 import io.kubernetes.client.util.Config;
 import java.nio.file.Paths;
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
index 074904c..38dd914 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -23,14 +23,14 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-import io.kubernetes.client.ApiException;
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1Container;
-import io.kubernetes.client.models.V1PodSpec;
-import io.kubernetes.client.models.V1PodTemplateSpec;
-import io.kubernetes.client.models.V1Secret;
-import io.kubernetes.client.models.V1StatefulSet;
-import io.kubernetes.client.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Secret;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
 
 import java.util.Collections;
 import java.util.Optional;
@@ -103,7 +103,7 @@ public class KubernetesSecretsTokenAuthProviderTest {
     @Test
     public void testCacheAuthData() throws ApiException {
         CoreV1Api coreV1Api = mock(CoreV1Api.class);
-        doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString());
+        doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString(), anyString(), anyString());
         KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
         kubernetesSecretsTokenAuthProvider.initialize(coreV1Api,  null, (fd) -> "default");
         Function.FunctionDetails funcDetails = Function.FunctionDetails.newBuilder().setTenant("test-tenant").setNamespace("test-ns").setName("test-func").build();
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index 4f3ad2e..18e1171 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -19,11 +19,11 @@
 
 package org.apache.pulsar.functions.runtime.kubernetes;
 
-import io.kubernetes.client.apis.AppsV1Api;
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1ConfigMap;
-import io.kubernetes.client.models.V1PodSpec;
-import io.kubernetes.client.models.V1StatefulSet;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.functions.Resources;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index ab6325e..9a725a9 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -22,10 +22,10 @@ package org.apache.pulsar.functions.runtime.kubernetes;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.protobuf.util.JsonFormat;
-import io.kubernetes.client.apis.AppsV1Api;
-import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.models.*;
+import io.kubernetes.client.openapi.models.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.InstanceConfig;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 83477c1..638780c 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -37,9 +37,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import io.kubernetes.client.apis.AppsV1Api;
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1PodSpec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/secrets/pom.xml b/pulsar-functions/secrets/pom.xml
index 1bd5aad..a7f62cf 100644
--- a/pulsar-functions/secrets/pom.xml
+++ b/pulsar-functions/secrets/pom.xml
@@ -34,7 +34,7 @@
   <dependency>
     <groupId>io.kubernetes</groupId>
     <artifactId>client-java</artifactId>
-    <version>2.0.0</version>
+    <version>9.0.2</version>
     <scope>compile</scope>
     <exclusions>
       <exclusion>
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
index 8d4dbff..2a94ece 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.functions.secretsproviderconfigurator;
 
 import com.google.gson.reflect.TypeToken;
-import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodSpec;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index 8d4e34e..81e50fd 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -20,9 +20,13 @@ package org.apache.pulsar.functions.secretsproviderconfigurator;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import io.kubernetes.client.apis.AppsV1Api;
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.*;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1SecretKeySelector;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
index 349559e..e51242e 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.functions.secretsproviderconfigurator;
 
-import io.kubernetes.client.apis.AppsV1Api;
-import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1PodSpec;
 import org.apache.pulsar.functions.proto.Function;
 
 import java.lang.reflect.Type;
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index d55510d..f44465e 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -41,10 +41,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.*;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentType;
 
@@ -86,7 +83,7 @@ public class ElasticSearchSink implements Sink<byte[]> {
         indexRequest.source(keyValue.getValue(), XContentType.JSON);
 
         try {
-        IndexResponse indexResponse = getClient().index(indexRequest);
+        IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
             if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
                 record.ack();
             } else {
@@ -105,7 +102,7 @@ public class ElasticSearchSink implements Sink<byte[]> {
     private void createIndexIfNeeded() throws IOException {
         GetIndexRequest request = new GetIndexRequest();
         request.indices(elasticSearchConfig.getIndexName());
-        boolean exists = getClient().indices().exists(request);
+        boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
 
         if (!exists) {
             CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
@@ -114,7 +111,7 @@ public class ElasticSearchSink implements Sink<byte[]> {
                .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
                .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
 
-            CreateIndexResponse ciresp = getClient().indices().create(cireq);
+            CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
             if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
                 throw new RuntimeException("Unable to create index.");
             }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
index eee208e..0f1cc8a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
@@ -65,8 +66,8 @@ public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer>
         searchRequest.types("doc");
         
         try {
-            SearchResponse searchResult = elasticClient.search(searchRequest);
-            assertTrue(searchResult.getHits().getTotalHits() > 0, searchResult.toString());
+            SearchResponse searchResult = elasticClient.search(searchRequest, RequestOptions.DEFAULT);
+            assertTrue(searchResult.getHits().getTotalHits().value > 0, searchResult.toString());
         } catch (Exception e) {
             fail("Encountered exception on validating elastic search results", e);
         }