You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:52 UTC
[06/23] samza git commit: SAMZA-1165; cleanup old zk versions.
SAMZA-1165; cleanup old zk versions.
Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Navina <na...@apache.org>, Shanthoosh V<sv...@linkedin.com>
Closes #239 from sborya/zkCleanUpBarrier1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4eb51531
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4eb51531
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4eb51531
Branch: refs/heads/0.14.0
Commit: 4eb51531387f018ea4350424edc45516ac3aea46
Parents: ce77716
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri Jul 21 13:43:18 2017 -0700
Committer: Jagadish <ja...@apache.org>
Committed: Fri Jul 21 13:43:18 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/TaskConfigJava.java | 2 +-
.../samza/zk/ScheduleAfterDebounceTime.java | 8 ++
.../samza/zk/ZkBarrierForVersionUpgrade.java | 7 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 3 +
.../main/java/org/apache/samza/zk/ZkUtils.java | 62 ++++++++++
.../java/org/apache/samza/zk/TestZkUtils.java | 112 ++++++++++++++++++-
.../samza/processor/TestZkStreamProcessor.java | 9 +-
.../processor/TestZkStreamProcessorBase.java | 2 +-
.../processor/TestZkStreamProcessorSession.java | 3 +
.../processor/TestZkLocalApplicationRunner.java | 3 +
10 files changed, 199 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index fc9f165..0bf078e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -36,7 +36,7 @@ import scala.collection.JavaConverters;
public class TaskConfigJava extends MapConfig {
// Task Configs
- private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
+ public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
public static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
// broadcast streams consumed by all tasks. e.g. kafka.foo#1
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 9b8ea66..6174063 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -48,6 +48,14 @@ public class ScheduleAfterDebounceTime {
// Action name when the Processor membership changes
public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+ /**
+ *
+ * cleanup process is started after every new job model generation is complete.
+ * It deletes old versions of job model and the barrier.
+ * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE.
+ **/
+ public static final String ON_ZK_CLEANUP = "OnCleanUp";
+
private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 196e431..3257ee1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -117,9 +117,7 @@ public class ZkBarrierForVersionUpgrade {
* @param version Version associated with the Barrier
*/
public void expire(String version) {
- zkUtils.writeData(
- keyBuilder.getBarrierStatePath(version),
- State.TIMED_OUT);
+ zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
}
/**
@@ -222,4 +220,7 @@ public class ZkBarrierForVersionUpgrade {
}
}
+ public static int getVersion(String barrierPath) {
+ return Integer.valueOf(barrierPath.substring(barrierPath.lastIndexOf('_') + 1));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 298c96e..dd08e3f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -60,6 +60,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
// TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
// with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
private static final int METADATA_CACHE_TTL_MS = 5000;
+ private static final int NUM_VERSIONS_TO_LEAVE = 10;
private final ZkUtils zkUtils;
private final String processorId;
@@ -202,6 +203,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
+
+ debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 8b6bc52..5df7114 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -22,6 +22,7 @@ package org.apache.samza.zk;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
@@ -32,6 +33,7 @@ import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
@@ -459,6 +461,66 @@ public class ZkUtils {
}
/**
+ * cleanup old data from ZK
+ * @param numVersionsToLeave - number of versions to leave
+ */
+ public void cleanupZK(int numVersionsToLeave) {
+ deleteOldBarrierVersions(numVersionsToLeave);
+ deleteOldJobModels(numVersionsToLeave);
+ }
+
+ void deleteOldJobModels(int numVersionsToLeave) {
+ // read current list of JMs
+ String path = keyBuilder.getJobModelPathPrefix();
+ LOG.info("about to delete jm path=" + path);
+ List<String> znodeIds = zkClient.getChildren(path);
+ deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ // jm version name format is <num>
+ return Integer.valueOf(o1) - Integer.valueOf(o2);
+ }
+ });
+ }
+
+ void deleteOldBarrierVersions(int numVersionsToLeave) {
+ // read current list of barriers
+ String path = keyBuilder.getJobModelVersionBarrierPrefix();
+ LOG.info("about to delete old barrier paths from " + path);
+ List<String> znodeIds = zkClient.getChildren(path);
+ LOG.info("List of all zkNodes: " + znodeIds);
+ deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ // barrier's name format is barrier_<num>
+ return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
+ }
+ });
+ }
+
+ void deleteOldVersionPath(String path, List<String> zNodeIds, int numVersionsToLeave, Comparator<String> c) {
+ if (StringUtils.isEmpty(path) || zNodeIds == null) {
+ LOG.warn("cannot cleanup empty path or empty list in ZK");
+ return;
+ }
+ if (zNodeIds.size() > numVersionsToLeave) {
+ Collections.sort(zNodeIds, c);
+ // get the znodes to delete
+ int size = zNodeIds.size();
+ List<String> zNodesToDelete = zNodeIds.subList(0, zNodeIds.size() - numVersionsToLeave);
+ LOG.info("Starting cleanup of barrier version zkNodes. From size=" + size + " to size " + zNodesToDelete.size() + "; numberToLeave=" + numVersionsToLeave);
+ for (String znodeId : zNodesToDelete) {
+ String pathToDelete = path + "/" + znodeId;
+ try {
+ LOG.info("deleting " + pathToDelete);
+ zkClient.deleteRecursive(pathToDelete);
+ } catch (Exception e) {
+ LOG.warn("delete of node " + pathToDelete + " failed.", e);
+ }
+ }
+ }
+ }
+ /**
* Represents zookeeper processor node.
*/
private static class ProcessorNode {
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index e7a9aa2..b5953d1 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,10 @@
*/
package org.apache.samza.zk;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,8 +42,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
-import org.junit.rules.ExpectedException;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
public class TestZkUtils {
private static EmbeddedZookeeper zkServer = null;
@@ -102,7 +106,6 @@ public class TestZkUtils {
// Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath));
-
}
@Test
@@ -237,6 +240,111 @@ public class TestZkUtils {
Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
}
+ @Test
+ public void testCleanUpZkJobModels() {
+ String root = zkUtils.getKeyBuilder().getJobModelPathPrefix();
+ System.out.println("root=" + root);
+ zkUtils.getZkClient().createPersistent(root, true);
+
+ // generate multiple version
+ for (int i = 101; i < 110; i++) {
+ zkUtils.publishJobModel(String.valueOf(i), null);
+ }
+
+ // clean all of the versions except 5 most recent ones
+ zkUtils.deleteOldJobModels(5);
+ Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root));
+ }
+
+ @Test
+ public void testCleanUpZkBarrierVersion() {
+ String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
+ zkUtils.getZkClient().createPersistent(root, true);
+ ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null);
+ for (int i = 200; i < 210; i++) {
+ barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
+ }
+
+ zkUtils.deleteOldBarrierVersions(5);
+ List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
+ Collections.sort(zNodeIds);
+ Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
+ zNodeIds);
+ }
+
+ @Test
+ public void testCleanUpZk() {
+ String pathA = "/path/testA";
+ String pathB = "/path/testB";
+ zkUtils.getZkClient().createPersistent(pathA, true);
+ zkUtils.getZkClient().createPersistent(pathB, true);
+
+ // Create 100 nodes
+ for (int i = 0; i < 20; i++) {
+ String p1 = pathA + "/" + i;
+ zkUtils.getZkClient().createPersistent(p1, true);
+ zkUtils.getZkClient().createPersistent(p1 + "/something1", true);
+ zkUtils.getZkClient().createPersistent(p1 + "/something2", true);
+
+ String p2 = pathB + "/some_" + i;
+ zkUtils.getZkClient().createPersistent(p2, true);
+ zkUtils.getZkClient().createPersistent(p2 + "/something1", true);
+ zkUtils.getZkClient().createPersistent(p2 + "/something2", true);
+ }
+
+ List<String> zNodeIds = new ArrayList<>();
+ // empty list
+ zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return o1.compareTo(o2);
+ }
+ });
+
+
+ zNodeIds = zkUtils.getZkClient().getChildren(pathA);
+ zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return Integer.valueOf(o1) - Integer.valueOf(o2);
+ }
+ });
+
+ for (int i = 0; i < 10; i++) {
+ // should be gone
+ String p1 = pathA + "/" + i;
+ Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+ }
+
+ for (int i = 10; i < 20; i++) {
+ // should be gone
+ String p1 = pathA + "/" + i;
+ Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+ }
+
+ zNodeIds = zkUtils.getZkClient().getChildren(pathB);
+ zkUtils.deleteOldVersionPath(pathB, zNodeIds, 1, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return Integer.valueOf(o1.substring(o1.lastIndexOf("_") + 1)) - Integer
+ .valueOf(o2.substring(o2.lastIndexOf("_") + 1));
+ }
+ });
+
+ for (int i = 0; i < 19; i++) {
+ // should be gone
+ String p1 = pathB + "/" + i;
+ Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+ }
+
+ for (int i = 19; i < 20; i++) {
+ // should be gone
+ String p1 = pathB + "/some_" + i;
+ Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+ }
+
+ }
+
public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
long delay = startDelayMs;
while (delay < maxDelayMs) {
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
index 1a13825..7253b29 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
@@ -20,7 +20,6 @@
package org.apache.samza.processor;
import java.util.concurrent.CountDownLatch;
-import org.apache.samza.zk.TestZkUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -146,8 +145,8 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
LOG.info("containerStopped latch = " + containerStopped1);
waitForProcessorToStartStop(containerStopped1);
- // let the system to publish and distribute the new job model
- TestZkUtils.sleepMs(600);
+ // read again the first batch
+ waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
// produce the second batch of the messages, starting with 'messageCount'
produceMessages(messageCount, inputTopic, messageCount);
@@ -226,8 +225,8 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
LOG.info("containerStopped latch = " + containerStopped2);
waitForProcessorToStartStop(containerStopped2);
- // let the system to publish and distribute the new job model
- TestZkUtils.sleepMs(300);
+ // read again the first batch
+ waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
// produce the second batch of the messages, starting with 'messageCount'
produceMessages(messageCount, inputTopic, messageCount);
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 4cbe252..f2f1585 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -287,7 +287,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
System.out.println("2read all. current count = " + leftEventsCount);
break;
}
- TestZkUtils.sleepMs(3000);
+ TestZkUtils.sleepMs(5000);
attempts--;
}
Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
index 10b08d9..6aab5e3 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
@@ -112,6 +112,9 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase {
waitForProcessorToStartStop(containerStopLatches[i]);
}
+ // read again the first batch
+ waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
+
produceMessages(messageCount, inputTopic, messageCount);
waitUntilMessagesLeftN(0);
http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 77e2a49..ebbe07b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -44,6 +44,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
+import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.JobModel;
@@ -83,6 +84,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
private static final String TEST_JOB_NAME = "test-job";
+ private static final String TASK_SHUTDOWN_MS = "5000";
private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
private String inputKafkaTopic;
@@ -169,6 +171,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
.put(ApplicationConfig.APP_ID, appId)
.put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
.put(JobConfig.JOB_NAME(), TEST_JOB_NAME)
+ .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
.build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));