You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2023/01/30 18:18:17 UTC

[samza] branch master updated: [SAMZA-2773] Migrate zkclient library (#1651)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new b15b4f86b [SAMZA-2773] Migrate zkclient library (#1651)
b15b4f86b is described below

commit b15b4f86bba868dec9bb20b16482f4119f2b5907
Author: Katie Liu <ka...@linkedin.com>
AuthorDate: Mon Jan 30 10:18:10 2023 -0800

    [SAMZA-2773] Migrate zkclient library (#1651)
    
    * Migrate zkclient library
    I0Itec zkclient has not been maintained for a while. Moving to the apache helix copy.
    * Remove unused imports
    * PR comments: version constant, remove NonNull
---
 build.gradle                                       |  9 +++++---
 gradle/dependency-versions.gradle                  |  2 ++
 .../org/apache/samza/zk/ZkCoordinationUtils.java   |  2 +-
 .../samza/zk/ZkCoordinationUtilsFactory.java       |  2 +-
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |  9 ++++++--
 .../apache/samza/zk/ZkJobCoordinatorFactory.java   |  2 +-
 .../java/org/apache/samza/zk/ZkLeaderElector.java  |  2 +-
 .../java/org/apache/samza/zk/ZkMetadataStore.java  |  4 ++--
 .../org/apache/samza/zk/ZkStringSerializer.java    |  4 ++--
 .../src/main/java/org/apache/samza/zk/ZkUtils.java | 14 ++++++------
 .../samza/zk/TestZkBarrierForVersionUpgrade.java   |  2 +-
 .../apache/samza/zk/TestZkClusterMembership.java   |  2 +-
 .../org/apache/samza/zk/TestZkDistributedLock.java |  2 +-
 .../org/apache/samza/zk/TestZkJobCoordinator.java  |  2 +-
 .../org/apache/samza/zk/TestZkLeaderElector.java   | 16 +++++++-------
 .../org/apache/samza/zk/TestZkMetadataStore.java   |  8 ++++++-
 .../java/org/apache/samza/zk/TestZkNamespace.java  |  4 ++--
 .../org/apache/samza/zk/TestZkProcessorLatch.java  |  2 +-
 .../test/java/org/apache/samza/zk/TestZkUtils.java | 25 +++++++++++-----------
 .../samza/sql/client/impl/SamzaExecutor.java       |  4 ++--
 .../samza/processor/TestZkStreamProcessorBase.java |  4 ++--
 .../processor/TestZkLocalApplicationRunner.java    |  2 +-
 22 files changed, 69 insertions(+), 54 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7b3dc4b88..dcafcbf68 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,11 +220,12 @@ project(":samza-core_$scalaSuffix") {
 
   dependencies {
     compile project(':samza-api')
-    compile("com.101tec:zkclient:$zkClientVersion") {
+    compile("org.apache.helix:zookeeper-api:$helixVersion") {
       exclude module: 'junit:junit'
       // exclude the slf4j implementation since it should be up to the application to choose the logging implementation
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
+    compile "org.xerial.snappy:snappy-java:$snappyVersion"
     compile "com.google.guava:guava:$guavaVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
@@ -434,7 +435,8 @@ project(":samza-kafka_$scalaSuffix") {
     compile project(':samza-api')
     compile project(":samza-core_$scalaSuffix")
     compile "org.scala-lang:scala-library:$scalaVersion"
-    compile "com.101tec:zkclient:$zkClientVersion"
+    compile "org.apache.helix:zookeeper-api:$helixVersion"
+    compile "org.xerial.snappy:snappy-java:$snappyVersion"
     compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
     compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
     compile "org.apache.kafka:kafka_$scalaSuffix:$kafkaVersion"
@@ -981,7 +983,8 @@ project(":samza-test_$scalaSuffix") {
     compile "junit:junit:$junitVersion"
     compile "org.hamcrest:hamcrest-all:$hamcrestVersion"
     testCompile "org.apache.kafka:kafka_$scalaSuffix:$kafkaVersion:test"
-    testCompile "com.101tec:zkclient:$zkClientVersion"
+    testCompile "org.apache.helix:zookeeper-api:$helixVersion"
+    testCompile "org.xerial.snappy:snappy-java:$snappyVersion"
     testCompile project(":samza-kafka_$scalaSuffix")
     testCompile "org.apache.kafka:kafka_$scalaSuffix:$kafkaVersion:test"
     testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index f12eca4cf..2a1bfbd71 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -29,6 +29,7 @@
   gsonVersion = "2.8.6"
   guavaVersion = "30.1-jre"
   hamcrestVersion = "1.3"
+  helixVersion = "1.1.0"
   httpClientVersion = "4.4.1"
   jacksonVersion = "2.13.3"
   jerseyVersion = "2.22.1"
@@ -44,6 +45,7 @@
   powerMockVersion = "1.6.6"
   rocksdbVersion = "7.8.3"
   scalaTestVersion = "3.0.1"
+  snappyVersion = "1.1.8.4"
   slf4jVersion = "1.7.7"
   yarnVersion = "2.10.1"
   zkClientVersion = "0.11"
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 1e3b58f40..10756a38c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.ClusterMembership;
 import org.apache.samza.coordinator.CoordinationUtils;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index 0cf93c997..f5ea2f5aa 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -19,7 +19,7 @@
 package org.apache.samza.zk;
 
 import com.google.common.base.Strings;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 447143a98..09ef501c4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -804,7 +804,6 @@ public class ZkJobCoordinator implements JobCoordinator {
       }
     }
 
-    @Override
     public void handleNewSession() {
       zkSessionMetrics.zkNewSessions.inc();
       LOG.info("Got new session created event for processor=" + processorId);
@@ -814,6 +813,12 @@ public class ZkJobCoordinator implements JobCoordinator {
       zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
     }
 
+    @Override
+    public void handleNewSession(final String sessionId) {
+      LOG.info("Handling new session with sessionId=" + sessionId);
+      handleNewSession();
+    }
+
     @Override
     public void handleSessionEstablishmentError(Throwable error) {
       // this means we cannot connect to zookeeper to establish a session
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 7e13e911b..f05d5fee9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 9171d9d6e..82da0f66d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.coordinator.LeaderElector;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
index 697607ef3..010522f58 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -27,14 +27,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.zip.CRC32;
-import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.BytesPushThroughSerializer;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.SamzaException;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java b/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java
index da3807c98..3eafeb29c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java
@@ -19,8 +19,8 @@
 package org.apache.samza.zk;
 
 import java.io.UnsupportedEncodingException;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 
 
 public class ZkStringSerializer implements ZkSerializer {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 3084be8bb..b1de5425b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -32,12 +32,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
@@ -73,7 +73,7 @@ import org.slf4j.LoggerFactory;
  * <p>
  *   <b>Note on Session Management:</b>
  *   Session management, if needed, should be handled by the caller. This can be done by implementing
- *   {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
+ *   {@link org.apache.helix.zookeeper.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
  *   callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking
  *   processing in the callbacks.
  * </p>
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index b57654957..bc48f47ba 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -19,7 +19,7 @@
 package org.apache.samza.zk;
 
 import com.google.common.collect.ImmutableList;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
index 1775db52e..15d2b1634 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.apache.samza.util.NoOpMetricsRegistry;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
index b5d85aa80..848909108 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
@@ -19,7 +19,7 @@
 package org.apache.samza.zk;
 
 import java.time.Duration;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.apache.samza.util.NoOpMetricsRegistry;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 586dfc0d9..1172733d5 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -27,7 +27,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 52f30d8a6..2752900ec 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.apache.samza.util.NoOpMetricsRegistry;
@@ -147,17 +147,17 @@ public class TestZkLeaderElector {
 
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1);
     leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
 
     // Processor-2
     ZkUtils zkUtils2 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2);
     leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
 
     // Processor-3
     ZkUtils zkUtils3 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3);
     leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true);
 
     Assert.assertEquals(0, testZkUtils.getSortedActiveProcessorsZnodes().size());
@@ -409,12 +409,12 @@ public class TestZkLeaderElector {
     // Processor-1
 
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1);
     leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
 
     // Processor-2
     ZkUtils zkUtils2 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2);
     leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
 
     // Before Leader Election
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 94f188e35..99ad18fee 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -41,7 +41,13 @@ public class TestZkMetadataStore {
 
   private static final Random RANDOM = new Random();
 
-  private static final int VALUE_SIZE_IN_BYTES = 1024  * 1024 * 10; // 10 MB
+  // In 101tec version, this constant was 1024 * 1024 * 10
+  // The limit changed to 1024 * 1000 after helix migration.
+  // A larger number would throw an exception.
+  // See: https://github.com/apache/helix/blob/654636e54268907deb2e12d32913455cc543b436/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java#L2386
+  // The limit can be set through a system property, but not in unit tests.
+  // https://github.com/apache/helix/blob/654636e54268907deb2e12d32913455cc543b436/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java#L106
+  private static final int VALUE_SIZE_IN_BYTES = 512000;
 
   private static EmbeddedZookeeper zkServer;
 
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
index 1defccbd4..c4f15aa34 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
@@ -20,8 +20,8 @@
 package org.apache.samza.zk;
 
 import com.google.common.base.Strings;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.samza.SamzaException;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.AfterClass;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 7876679b7..ffa891b51 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.apache.samza.util.NoOpMetricsRegistry;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 13fc13bbd..ec7113c96 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -30,11 +30,11 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.BooleanSupplier;
 import com.google.common.collect.ImmutableList;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
@@ -53,6 +53,10 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -522,7 +526,7 @@ public class TestZkUtils {
 
   @Test
   public void testCloseShouldRetryOnceOnInterruptedException() {
-    ZkClient zkClient = Mockito.mock(ZkClient.class);
+    ZkClient zkClient = mock(ZkClient.class);
     ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
 
     Mockito.doThrow(new ZkInterruptedException(new InterruptedException()))
@@ -538,7 +542,7 @@ public class TestZkUtils {
   public void testCloseShouldTearDownZkConnectionOnInterruptedException() throws Exception {
     CountDownLatch latch = new CountDownLatch(1);
     // Establish connection with the zookeeper server.
-    ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort());
+    ZkClient zkClient = mock(ZkClient.class);
     ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
 
     Thread threadToInterrupt = new Thread(() -> {
@@ -552,14 +556,9 @@ public class TestZkUtils {
 
     threadToInterrupt.start();
 
-    Field field = ZkClient.class.getDeclaredField("_closed");
-    field.setAccessible(true);
-
-    Assert.assertFalse(field.getBoolean(zkClient));
-
     threadToInterrupt.interrupt();
     threadToInterrupt.join();
 
-    Assert.assertTrue(field.getBoolean(zkClient));
+    verify(zkClient, times(1)).close();
   }
 }
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 3e9126dd3..acfae4ff7 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.sql.client.impl;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.samza.SamzaException;
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index d80d0ed73..d6bacc5cb 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -30,8 +30,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import kafka.utils.TestUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.ProducerRecord;
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 23821cc06..be5a344d1 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -38,7 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.Partition;