You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/10 01:19:01 UTC

samza git commit: SAMZA-1272 : ZkCoordinationUtils deletes the entire Zk tree on reset

Repository: samza
Updated Branches:
  refs/heads/master b815e6d91 -> 8183781f2


SAMZA-1272 : ZkCoordinationUtils deletes the entire Zk tree on reset

* ZkCoordinationUtils has a reset interface that deletes the entire Zk tree. This is not desirable.
* Also, fixed flakiness in unit test by unique barrier name in each of the unit tests. Otherwise, they share the same path on Zk and fail during concurrent test execution

Author: Navina Ramesh <na...@apache.org>

Reviewers: Xinyu Liu <xi...@linkedin.com>, Prateek Maheshwari <pm...@linkedin.com>

Closes #173 from navina/SAMZA-1272


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8183781f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8183781f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8183781f

Branch: refs/heads/master
Commit: 8183781f2e2ea721013456bedcb7af8714581b58
Parents: b815e6d
Author: Navina Ramesh <na...@apache.org>
Authored: Tue May 9 18:18:52 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Tue May 9 18:18:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/zk/ZkControllerImpl.java   |  1 -
 .../apache/samza/zk/ZkCoordinationUtils.java    |  4 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  1 +
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 19 ++----
 .../zk/TestZkBarrierForVersionUpgrade.java      | 61 +++++---------------
 .../apache/samza/zk/TestZkLeaderElector.java    |  2 +-
 .../apache/samza/zk/TestZkProcessorLatch.java   |  2 +-
 7 files changed, 24 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 52bfef1..7821ef9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -76,7 +76,6 @@ public class ZkControllerImpl implements ZkController {
     if (isLeader()) {
       zkLeaderElector.resignLeadership();
     }
-    zkUtils.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 5a6c88a..965b32a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -41,7 +41,7 @@ public class ZkCoordinationUtils implements CoordinationUtils {
 
   @Override
   public void reset() {
-    zkUtils.deleteRoot();
+    zkUtils.close();
   }
 
   @Override
@@ -59,7 +59,7 @@ public class ZkCoordinationUtils implements CoordinationUtils {
     return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, zkConfig.getZkBarrierTimeoutMs());
   }
 
-  // TODO - SAMZA-1128 CoordinationService should directly depende on ZkUtils and DebounceTimer
+  // TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer
   public ZkUtils getZkUtils() {
     return zkUtils;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 37eba2d..91249de 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -91,6 +91,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
     debounceTimer.stopScheduler();
     zkController.stop();
+
     if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index be877a4..5c8fcf3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,11 +19,6 @@
 
 package org.apache.samza.zk;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
@@ -37,6 +32,12 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Util class to help manage Zk connection and ZkClient.
  * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
@@ -299,12 +300,4 @@ public class ZkUtils {
     LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
     zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
   }
-
-  public void deleteRoot() {
-    String rootPath = keyBuilder.getRootPath();
-    if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
-      LOG.info("Deleteing root: " + rootPath);
-      zkClient.deleteRecursive(rootPath);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 3f2c265..f1bb804 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -62,7 +62,6 @@ public class TestZkBarrierForVersionUpgrade {
 
     CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();
     coordinationUtils = serviceFactory.getCoordinationService(groupId, processorId, config);
-    coordinationUtils.reset();
   }
 
   @After
@@ -80,7 +79,7 @@ public class TestZkBarrierForVersionUpgrade {
   public void testZkBarrierForVersionUpgrade() {
     String barrierId = "b1";
     String ver = "1";
-    List<String> processors = new ArrayList<String>();
+    List<String> processors = new ArrayList<>();
     processors.add("p1");
     processors.add("p2");
 
@@ -94,29 +93,18 @@ public class TestZkBarrierForVersionUpgrade {
 
     barrier.start(ver, processors);
 
-    barrier.waitForBarrier(ver, "p1", new Runnable() {
-      @Override
-      public void run() {
-        s.p1 = true;
-      }
-    });
+    barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
 
-    barrier.waitForBarrier(ver, "p2", new Runnable() {
-      @Override
-      public void run() {
-        s.p2 = true;
-      }
-    });
+    barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
 
     Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100));
   }
 
   @Test
   public void testNegativeZkBarrierForVersionUpgrade() {
-
-    String barrierId = "b1";
+    String barrierId = "negativeZkBarrierForVersionUpgrade";
     String ver = "1";
-    List<String> processors = new ArrayList<String>();
+    List<String> processors = new ArrayList<>();
     processors.add("p1");
     processors.add("p2");
     processors.add("p3");
@@ -132,28 +120,18 @@ public class TestZkBarrierForVersionUpgrade {
 
     barrier.start(ver, processors);
 
-    barrier.waitForBarrier(ver, "p1", new Runnable() {
-      @Override
-      public void run() {
-        s.p1 = true;
-      }
-    });
+    barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
 
-    barrier.waitForBarrier(ver, "p2", new Runnable() {
-      @Override
-      public void run() {
-        s.p2 = true;
-      }
-    });
+    barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
 
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
   }
 
   @Test
   public void testZkBarrierForVersionUpgradeWithTimeOut() {
-    String barrierId = "b1";
+    String barrierId = "barrierTimeout";
     String ver = "1";
-    List<String> processors = new ArrayList<String>();
+    List<String> processors = new ArrayList<>();
     processors.add("p1");
     processors.add("p2");
     processors.add("p3");
@@ -169,28 +147,15 @@ public class TestZkBarrierForVersionUpgrade {
 
     barrier.start(ver, processors);
 
-    barrier.waitForBarrier(ver, "p1", new Runnable() {
-      @Override
-      public void run() {
-        s.p1 = true;
-      }
-    });
+    barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
 
-    barrier.waitForBarrier(ver, "p2", new Runnable() {
-      @Override
-      public void run() {
-        s.p2 = true;
-      }
-    });
+    barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
 
     // this node will join "too late"
-    barrier.waitForBarrier(ver, "p3", new Runnable() {
-      @Override
-      public void run() {
+    barrier.waitForBarrier(ver, "p3", () -> {
         TestZkUtils.sleepMs(300);
         s.p3 = true;
-      }
-    });
+      });
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 48dca9a..7cfad61 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -79,7 +79,7 @@ public class TestZkLeaderElector {
 
   @After
   public void testTeardown() {
-    testZkUtils.deleteRoot();
+    testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
     testZkUtils.close();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/8183781f/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 06220d5..2385b32 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -67,7 +67,7 @@ public class TestZkProcessorLatch {
 
   @After
   public void testTeardown() {
-    testZkUtils.deleteRoot();
+    testZkUtils.getZkClient().deleteRecursive(KEY_BUILDER.getRootPath());
     testZkUtils.close();
   }