You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/10 01:19:01 UTC
samza git commit: SAMZA-1272 : ZkCoordinationUtils deletes the entire
Zk tree on reset
Repository: samza
Updated Branches:
refs/heads/master b815e6d91 -> 8183781f2
SAMZA-1272 : ZkCoordinationUtils deletes the entire Zk tree on reset
* ZkCoordinationUtils has a reset interface that deletes the entire Zk tree. This is not desirable.
* Also, fixed flakiness in unit test by unique barrier name in each of the unit tests. Otherwise, they share the same path on Zk and fail during concurrent test execution
Author: Navina Ramesh <na...@apache.org>
Reviewers: Xinyu Liu <xi...@linkedin.com>, Prateek Maheshwari <pm...@linkedin.com>
Closes #173 from navina/SAMZA-1272
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8183781f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8183781f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8183781f
Branch: refs/heads/master
Commit: 8183781f2e2ea721013456bedcb7af8714581b58
Parents: b815e6d
Author: Navina Ramesh <na...@apache.org>
Authored: Tue May 9 18:18:52 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Tue May 9 18:18:52 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/zk/ZkControllerImpl.java | 1 -
.../apache/samza/zk/ZkCoordinationUtils.java | 4 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 1 +
.../main/java/org/apache/samza/zk/ZkUtils.java | 19 ++----
.../zk/TestZkBarrierForVersionUpgrade.java | 61 +++++---------------
.../apache/samza/zk/TestZkLeaderElector.java | 2 +-
.../apache/samza/zk/TestZkProcessorLatch.java | 2 +-
7 files changed, 24 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 52bfef1..7821ef9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -76,7 +76,6 @@ public class ZkControllerImpl implements ZkController {
if (isLeader()) {
zkLeaderElector.resignLeadership();
}
- zkUtils.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 5a6c88a..965b32a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -41,7 +41,7 @@ public class ZkCoordinationUtils implements CoordinationUtils {
@Override
public void reset() {
- zkUtils.deleteRoot();
+ zkUtils.close();
}
@Override
@@ -59,7 +59,7 @@ public class ZkCoordinationUtils implements CoordinationUtils {
return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, zkConfig.getZkBarrierTimeoutMs());
}
- // TODO - SAMZA-1128 CoordinationService should directly depende on ZkUtils and DebounceTimer
+ // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer
public ZkUtils getZkUtils() {
return zkUtils;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 37eba2d..91249de 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -91,6 +91,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
debounceTimer.stopScheduler();
zkController.stop();
+
if (coordinatorListener != null) {
coordinatorListener.onCoordinatorStop();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index be877a4..5c8fcf3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,11 +19,6 @@
package org.apache.samza.zk;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
@@ -37,6 +32,12 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
/**
* Util class to help manage Zk connection and ZkClient.
* It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
@@ -299,12 +300,4 @@ public class ZkUtils {
LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
}
-
- public void deleteRoot() {
- String rootPath = keyBuilder.getRootPath();
- if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
- LOG.info("Deleteing root: " + rootPath);
- zkClient.deleteRecursive(rootPath);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 3f2c265..f1bb804 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -62,7 +62,6 @@ public class TestZkBarrierForVersionUpgrade {
CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();
coordinationUtils = serviceFactory.getCoordinationService(groupId, processorId, config);
- coordinationUtils.reset();
}
@After
@@ -80,7 +79,7 @@ public class TestZkBarrierForVersionUpgrade {
public void testZkBarrierForVersionUpgrade() {
String barrierId = "b1";
String ver = "1";
- List<String> processors = new ArrayList<String>();
+ List<String> processors = new ArrayList<>();
processors.add("p1");
processors.add("p2");
@@ -94,29 +93,18 @@ public class TestZkBarrierForVersionUpgrade {
barrier.start(ver, processors);
- barrier.waitForBarrier(ver, "p1", new Runnable() {
- @Override
- public void run() {
- s.p1 = true;
- }
- });
+ barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
- barrier.waitForBarrier(ver, "p2", new Runnable() {
- @Override
- public void run() {
- s.p2 = true;
- }
- });
+ barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100));
}
@Test
public void testNegativeZkBarrierForVersionUpgrade() {
-
- String barrierId = "b1";
+ String barrierId = "negativeZkBarrierForVersionUpgrade";
String ver = "1";
- List<String> processors = new ArrayList<String>();
+ List<String> processors = new ArrayList<>();
processors.add("p1");
processors.add("p2");
processors.add("p3");
@@ -132,28 +120,18 @@ public class TestZkBarrierForVersionUpgrade {
barrier.start(ver, processors);
- barrier.waitForBarrier(ver, "p1", new Runnable() {
- @Override
- public void run() {
- s.p1 = true;
- }
- });
+ barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
- barrier.waitForBarrier(ver, "p2", new Runnable() {
- @Override
- public void run() {
- s.p2 = true;
- }
- });
+ barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
}
@Test
public void testZkBarrierForVersionUpgradeWithTimeOut() {
- String barrierId = "b1";
+ String barrierId = "barrierTimeout";
String ver = "1";
- List<String> processors = new ArrayList<String>();
+ List<String> processors = new ArrayList<>();
processors.add("p1");
processors.add("p2");
processors.add("p3");
@@ -169,28 +147,15 @@ public class TestZkBarrierForVersionUpgrade {
barrier.start(ver, processors);
- barrier.waitForBarrier(ver, "p1", new Runnable() {
- @Override
- public void run() {
- s.p1 = true;
- }
- });
+ barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
- barrier.waitForBarrier(ver, "p2", new Runnable() {
- @Override
- public void run() {
- s.p2 = true;
- }
- });
+ barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
// this node will join "too late"
- barrier.waitForBarrier(ver, "p3", new Runnable() {
- @Override
- public void run() {
+ barrier.waitForBarrier(ver, "p3", () -> {
TestZkUtils.sleepMs(300);
s.p3 = true;
- }
- });
+ });
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 48dca9a..7cfad61 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -79,7 +79,7 @@ public class TestZkLeaderElector {
@After
public void testTeardown() {
- testZkUtils.deleteRoot();
+ testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
testZkUtils.close();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 06220d5..2385b32 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -67,7 +67,7 @@ public class TestZkProcessorLatch {
@After
public void testTeardown() {
- testZkUtils.deleteRoot();
+ testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
testZkUtils.close();
}