You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2023/01/30 18:18:17 UTC
[samza] branch master updated: [SAMZA-2773] Migrate zkclient library (#1651)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new b15b4f86b [SAMZA-2773] Migrate zkclient library (#1651)
b15b4f86b is described below
commit b15b4f86bba868dec9bb20b16482f4119f2b5907
Author: Katie Liu <ka...@linkedin.com>
AuthorDate: Mon Jan 30 10:18:10 2023 -0800
[SAMZA-2773] Migrate zkclient library (#1651)
* Migrate zkclient library
I0Itec zkclient has not been maintained for a while. Moving to the apache helix copy.
* Remove unused imports
* PR comments: version constant, remove NonNull
---
build.gradle | 9 +++++---
gradle/dependency-versions.gradle | 2 ++
.../org/apache/samza/zk/ZkCoordinationUtils.java | 2 +-
.../samza/zk/ZkCoordinationUtilsFactory.java | 2 +-
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 9 ++++++--
.../apache/samza/zk/ZkJobCoordinatorFactory.java | 2 +-
.../java/org/apache/samza/zk/ZkLeaderElector.java | 2 +-
.../java/org/apache/samza/zk/ZkMetadataStore.java | 4 ++--
.../org/apache/samza/zk/ZkStringSerializer.java | 4 ++--
.../src/main/java/org/apache/samza/zk/ZkUtils.java | 14 ++++++------
.../samza/zk/TestZkBarrierForVersionUpgrade.java | 2 +-
.../apache/samza/zk/TestZkClusterMembership.java | 2 +-
.../org/apache/samza/zk/TestZkDistributedLock.java | 2 +-
.../org/apache/samza/zk/TestZkJobCoordinator.java | 2 +-
.../org/apache/samza/zk/TestZkLeaderElector.java | 16 +++++++-------
.../org/apache/samza/zk/TestZkMetadataStore.java | 8 ++++++-
.../java/org/apache/samza/zk/TestZkNamespace.java | 4 ++--
.../org/apache/samza/zk/TestZkProcessorLatch.java | 2 +-
.../test/java/org/apache/samza/zk/TestZkUtils.java | 25 +++++++++++-----------
.../samza/sql/client/impl/SamzaExecutor.java | 4 ++--
.../samza/processor/TestZkStreamProcessorBase.java | 4 ++--
.../processor/TestZkLocalApplicationRunner.java | 2 +-
22 files changed, 69 insertions(+), 54 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7b3dc4b88..dcafcbf68 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,11 +220,12 @@ project(":samza-core_$scalaSuffix") {
dependencies {
compile project(':samza-api')
- compile("com.101tec:zkclient:$zkClientVersion") {
+ compile("org.apache.helix:zookeeper-api:$helixVersion") {
exclude module: 'junit:junit'
// exclude the slf4j implementation since it should be up to the application to choose the logging implementation
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
+ compile "org.xerial.snappy:snappy-java:$snappyVersion"
compile "com.google.guava:guava:$guavaVersion"
compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
@@ -434,7 +435,8 @@ project(":samza-kafka_$scalaSuffix") {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.scala-lang:scala-library:$scalaVersion"
- compile "com.101tec:zkclient:$zkClientVersion"
+ compile "org.apache.helix:zookeeper-api:$helixVersion"
+ compile "org.xerial.snappy:snappy-java:$snappyVersion"
compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
compile "org.apache.kafka:kafka_$scalaSuffix:$kafkaVersion"
@@ -981,7 +983,8 @@ project(":samza-test_$scalaSuffix") {
compile "junit:junit:$junitVersion"
compile "org.hamcrest:hamcrest-all:$hamcrestVersion"
testCompile "org.apache.kafka:kafka_$scalaSuffix:$kafkaVersion:test"
- testCompile "com.101tec:zkclient:$zkClientVersion"
+ testCompile "org.apache.helix:zookeeper-api:$helixVersion"
+ testCompile "org.xerial.snappy:snappy-java:$snappyVersion"
testCompile project(":samza-kafka_$scalaSuffix")
testCompile "org.apache.kafka:kafka_$scalaSuffix:$kafkaVersion:test"
testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index f12eca4cf..2a1bfbd71 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -29,6 +29,7 @@
gsonVersion = "2.8.6"
guavaVersion = "30.1-jre"
hamcrestVersion = "1.3"
+ helixVersion = "1.1.0"
httpClientVersion = "4.4.1"
jacksonVersion = "2.13.3"
jerseyVersion = "2.22.1"
@@ -44,6 +45,7 @@
powerMockVersion = "1.6.6"
rocksdbVersion = "7.8.3"
scalaTestVersion = "3.0.1"
+ snappyVersion = "1.1.8.4"
slf4jVersion = "1.7.7"
yarnVersion = "2.10.1"
zkClientVersion = "0.11"
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 1e3b58f40..10756a38c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -18,7 +18,7 @@
*/
package org.apache.samza.zk;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.ClusterMembership;
import org.apache.samza.coordinator.CoordinationUtils;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index 0cf93c997..f5ea2f5aa 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -19,7 +19,7 @@
package org.apache.samza.zk;
import com.google.common.base.Strings;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 447143a98..09ef501c4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -29,7 +29,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -804,7 +804,6 @@ public class ZkJobCoordinator implements JobCoordinator {
}
}
- @Override
public void handleNewSession() {
zkSessionMetrics.zkNewSessions.inc();
LOG.info("Got new session created event for processor=" + processorId);
@@ -814,6 +813,12 @@ public class ZkJobCoordinator implements JobCoordinator {
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils));
}
+ @Override
+ public void handleNewSession(final String sessionId) {
+ LOG.info("Handling new session with sessionId=" + sessionId);
+ handleNewSession();
+ }
+
@Override
public void handleSessionEstablishmentError(Throwable error) {
// this means we cannot connect to zookeeper to establish a session
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 7e13e911b..f05d5fee9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -19,7 +19,7 @@
package org.apache.samza.zk;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 9171d9d6e..82da0f66d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.LeaderElector;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
index 697607ef3..010522f58 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
@@ -27,14 +27,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.CRC32;
-import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.BytesPushThroughSerializer;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.SamzaException;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java b/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java
index da3807c98..3eafeb29c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkStringSerializer.java
@@ -19,8 +19,8 @@
package org.apache.samza.zk;
import java.io.UnsupportedEncodingException;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
public class ZkStringSerializer implements ZkSerializer {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 3084be8bb..b1de5425b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -32,12 +32,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
@@ -73,7 +73,7 @@ import org.slf4j.LoggerFactory;
* <p>
* <b>Note on Session Management:</b>
* Session management, if needed, should be handled by the caller. This can be done by implementing
- * {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
+ * {@link org.apache.helix.zookeeper.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
* callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking
* processing in the callbacks.
* </p>
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index b57654957..bc48f47ba 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -19,7 +19,7 @@
package org.apache.samza.zk;
import com.google.common.collect.ImmutableList;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.testUtils.EmbeddedZookeeper;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
index 1775db52e..15d2b1634 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
@@ -18,7 +18,7 @@
*/
package org.apache.samza.zk;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.apache.samza.util.NoOpMetricsRegistry;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
index b5d85aa80..848909108 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
@@ -19,7 +19,7 @@
package org.apache.samza.zk;
import java.time.Duration;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.apache.samza.util.NoOpMetricsRegistry;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 586dfc0d9..1172733d5 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -27,7 +27,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 52f30d8a6..2752900ec 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -23,9 +23,9 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.samza.SamzaException;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.apache.samza.util.NoOpMetricsRegistry;
@@ -147,17 +147,17 @@ public class TestZkLeaderElector {
// Processor-1
ZkUtils zkUtils1 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
+ ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1);
leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
// Processor-2
ZkUtils zkUtils2 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+ ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2);
leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
// Processor-3
ZkUtils zkUtils3 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
+ ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3);
leaderElector3.setLeaderElectorListener(() -> isLeader3.res = true);
Assert.assertEquals(0, testZkUtils.getSortedActiveProcessorsZnodes().size());
@@ -409,12 +409,12 @@ public class TestZkLeaderElector {
// Processor-1
ZkUtils zkUtils1 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
+ ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1);
leaderElector1.setLeaderElectorListener(() -> isLeader1.res = true);
// Processor-2
ZkUtils zkUtils2 = getZkUtilsWithNewClient();
- ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+ ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2);
leaderElector2.setLeaderElectorListener(() -> isLeader2.res = true);
// Before Leader Election
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 94f188e35..99ad18fee 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -41,7 +41,13 @@ public class TestZkMetadataStore {
private static final Random RANDOM = new Random();
- private static final int VALUE_SIZE_IN_BYTES = 1024 * 1024 * 10; // 10 MB
+ // In 101tec version, this constant was 1024 * 1024 * 10
+ // The limit changed to 1024 * 1000 after helix migration.
+ // A larger number would throw an exception.
+ // See: https://github.com/apache/helix/blob/654636e54268907deb2e12d32913455cc543b436/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java#L2386
+ // The limit can be set through a system property, but not in unit tests.
+ // https://github.com/apache/helix/blob/654636e54268907deb2e12d32913455cc543b436/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java#L106
+ private static final int VALUE_SIZE_IN_BYTES = 512000;
private static EmbeddedZookeeper zkServer;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
index 1defccbd4..c4f15aa34 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java
@@ -20,8 +20,8 @@
package org.apache.samza.zk;
import com.google.common.base.Strings;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.samza.SamzaException;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.junit.AfterClass;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 7876679b7..ffa891b51 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.apache.samza.util.NoOpMetricsRegistry;
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 13fc13bbd..ec7113c96 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -30,11 +30,11 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.BooleanSupplier;
import com.google.common.collect.ImmutableList;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
@@ -53,6 +53,10 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class TestZkUtils {
private static EmbeddedZookeeper zkServer = null;
private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -522,7 +526,7 @@ public class TestZkUtils {
@Test
public void testCloseShouldRetryOnceOnInterruptedException() {
- ZkClient zkClient = Mockito.mock(ZkClient.class);
+ ZkClient zkClient = mock(ZkClient.class);
ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
Mockito.doThrow(new ZkInterruptedException(new InterruptedException()))
@@ -538,7 +542,7 @@ public class TestZkUtils {
public void testCloseShouldTearDownZkConnectionOnInterruptedException() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// Establish connection with the zookeeper server.
- ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort());
+ ZkClient zkClient = mock(ZkClient.class);
ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
Thread threadToInterrupt = new Thread(() -> {
@@ -552,14 +556,9 @@ public class TestZkUtils {
threadToInterrupt.start();
- Field field = ZkClient.class.getDeclaredField("_closed");
- field.setAccessible(true);
-
- Assert.assertFalse(field.getBoolean(zkClient));
-
threadToInterrupt.interrupt();
threadToInterrupt.join();
- Assert.assertTrue(field.getBoolean(zkClient));
+ verify(zkClient, times(1)).close();
}
}
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 3e9126dd3..acfae4ff7 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -19,8 +19,8 @@
package org.apache.samza.sql.client.impl;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.samza.SamzaException;
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index d80d0ed73..d6bacc5cb 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -30,8 +30,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.utils.TestUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 23821cc06..be5a344d1 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -38,7 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.Partition;