You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/15 05:02:36 UTC
samza git commit: SAMZA-1385: Fix zookeeper path conflict between
LocalApplicationRunner and ZkJobCoordinator
Repository: samza
Updated Branches:
refs/heads/master a0aae5292 -> ef6bb8196
SAMZA-1385: Fix zookeeper path conflict between LocalApplicationRunner and ZkJobCoordinator
Tested the fix w/ sample page view adclick joiner job.
navina sborya nickpan47 can you please take a look at the RB?
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Reviewers: Navina Ramesh <na...@apache.org>, Yi Pan <ni...@gmail.com>, Shanthoosh Venkataraman <sv...@linkedin.com>
Closes #265 from bharathkk/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ef6bb819
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ef6bb819
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ef6bb819
Branch: refs/heads/master
Commit: ef6bb8196c669e651ef6628c7b74a5594713ee82
Parents: a0aae52
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Mon Aug 14 22:02:24 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Aug 14 22:02:24 2017 -0700
----------------------------------------------------------------------
.../samza/runtime/LocalApplicationRunner.java | 80 +++++++++---
.../samza/zk/ZkJobCoordinatorFactory.java | 20 ++-
.../org/apache/samza/zk/ZkProcessorLatch.java | 11 +-
.../runtime/TestLocalApplicationRunner.java | 127 ++++++++++++++++---
4 files changed, 195 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index b0bfc8a..588e657 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaException;
@@ -57,13 +58,12 @@ import org.slf4j.LoggerFactory;
public class LocalApplicationRunner extends AbstractApplicationRunner {
private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
- // Latch id that's used for awaiting the init of application before creating the StreamProcessors
- private static final String INIT_LATCH_ID = "init";
+ private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData";
// Latch timeout is set to 10 min
private static final long LATCH_TIMEOUT_MINUTES = 10;
+ private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000;
private final String uid;
- private final CoordinationUtils coordinationUtils;
private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final AtomicInteger numProcessorsToStart = new AtomicInteger();
@@ -122,9 +122,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
}
}
- if (coordinationUtils != null) {
- coordinationUtils.reset();
- }
shutdownLatch.countDown();
}
}
@@ -132,7 +129,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
public LocalApplicationRunner(Config config) {
super(config);
uid = UUID.randomUUID().toString();
- coordinationUtils = createCoordinationUtils();
}
@Override
@@ -159,10 +155,14 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
try {
// 1. initialize and plan
ExecutionPlan plan = getExecutionPlan(app);
- writePlanJsonFile(plan.getPlanAsJson());
+
+ String executionPlanJson = plan.getPlanAsJson();
+ writePlanJsonFile(executionPlanJson);
// 2. create the necessary streams
- createStreams(plan.getIntermediateStreams());
+ // TODO: System generated intermediate streams should have robust naming scheme. Refer JIRA-1391
+ String planId = String.valueOf(executionPlanJson.hashCode());
+ createStreams(planId, plan.getIntermediateStreams());
// 3. create the StreamProcessors
if (plan.getJobConfigs().isEmpty()) {
@@ -216,7 +216,8 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
// TODO: we will need a better way to package the configs with application runner
if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
ApplicationConfig appConfig = new ApplicationConfig(config);
- return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config);
+ return new ZkCoordinationServiceFactory().getCoordinationService(
+ appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config);
} else {
return null;
}
@@ -227,20 +228,32 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
* If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
* will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
* stream creation.
+ * @param planId a unique identifier representing the plan used for coordination purpose
* @param intStreams list of intermediate {@link StreamSpec}s
- * @throws Exception exception for latch timeout
+ * @throws TimeoutException exception for latch timeout
*/
- /* package private */ void createStreams(List<StreamSpec> intStreams) throws Exception {
+ /* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
if (!intStreams.isEmpty()) {
+ // Move the scope of coordination utils within stream creation to address long idle connection problem.
+ // Refer SAMZA-1385 for more details
+ CoordinationUtils coordinationUtils = createCoordinationUtils();
if (coordinationUtils != null) {
- Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID);
- LeaderElector leaderElector = coordinationUtils.getLeaderElector();
- leaderElector.setLeaderElectorListener(() -> {
- getStreamManager().createStreams(intStreams);
- initLatch.countDown();
- });
- leaderElector.tryBecomeLeader();
- initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
+ Latch initLatch = coordinationUtils.getLatch(1, planId);
+
+ try {
+ // check if the processor needs to go through leader election and stream creation
+ if (shouldContestInElectionForStreamCreation(initLatch)) {
+ LeaderElector leaderElector = coordinationUtils.getLeaderElector();
+ leaderElector.setLeaderElectorListener(() -> {
+ getStreamManager().createStreams(intStreams);
+ initLatch.countDown();
+ });
+ leaderElector.tryBecomeLeader();
+ initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
+ }
+ } finally {
+ coordinationUtils.reset();
+ }
} else {
// each application process will try creating the streams, which
// requires stream creation to be idempotent
@@ -272,4 +285,31 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
taskFactory.getClass().getCanonicalName()));
}
}
+
+ /**
+ * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying
+ * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election.
+ * It creates the following issues whenever a new process joins after the ephemeral node is gone.
+ * 1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle
+ * 2. Underlying systems may not support check for stream existence prior to creation which could have potential problems.
+ * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the
+ * application lifecycle using {@link Latch#await(long, TimeUnit)}
+ *
+ * @param streamCreationLatch latch used for stream creation
+ * @return true if processor needs to be part of election
+ * false otherwise
+ */
+ private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) {
+ boolean eligibleForElection = true;
+
+ try {
+ streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+ // case we didn't time out suggesting that latch already exists
+ eligibleForElection = false;
+ } catch (TimeoutException e) {
+ LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams");
+ }
+
+ return eligibleForElection;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 85e3b4a..08d826e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -22,6 +22,7 @@ package org.apache.samza.zk;
import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -30,9 +31,13 @@ import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
+ private static final String JOB_COORDINATOR_ZK_PATH_FORMAT = "%s/%s-%s-coordinationData";
+ private static final String DEFAULT_JOB_ID = "1";
+ private static final String DEFAULT_JOB_NAME = "defaultJob";
/**
* Method to instantiate an implementation of JobCoordinator
@@ -50,8 +55,21 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
ZkConfig zkConfig = new ZkConfig(config);
- ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
+ ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config));
ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
}
+
+ private String getJobCoordinationZkPath(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ String appId = new ApplicationConfig(config).getGlobalAppId();
+ String jobName = jobConfig.getName().isDefined()
+ ? jobConfig.getName().get()
+ : DEFAULT_JOB_NAME;
+ String jobId = jobConfig.getJobId().isDefined()
+ ? jobConfig.getJobId().get()
+ : DEFAULT_JOB_ID;
+
+ return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, jobId);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
index decdd7d..166c627 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.samza.coordinator.Latch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +54,14 @@ public class ZkProcessorLatch implements Latch {
}
@Override
- public void await(long timeout, TimeUnit timeUnit) {
- zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
+ public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
+ // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
+ boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
+
+ if (!targetPathExists) {
+ throw new TimeoutException("Timed out waiting for the targetPath");
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index a04bd3b..6c7827e 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -19,6 +19,15 @@
package org.apache.samza.runtime;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
@@ -38,31 +47,31 @@ import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestLocalApplicationRunner {
+ private static final String PLAN_JSON = "{"
+ + "\"jobs\":[{"
+ + "\"jobName\":\"test-application\","
+ + "\"jobId\":\"1\","
+ + "\"operatorGraph\":{"
+ + "\"intermediateStreams\":{%s},"
+ + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
+ private static final String STREAM_SPEC_JSON_FORMAT = "\"%s\":{"
+ + "\"streamSpec\":{"
+ + "\"id\":\"%s\","
+ + "\"systemName\":\"%s\","
+ + "\"physicalName\":\"%s\","
+ + "\"partitionCount\":2},"
+ + "\"sourceJobs\":[\"test-app\"],"
+ + "\"targetJobs\":[\"test-target-app\"]},";
+
@Test
public void testStreamCreation() throws Exception {
Map<String, String> config = new HashMap<>();
@@ -180,8 +189,10 @@ public class TestLocalApplicationRunner {
@Override
public void await(long timeout, TimeUnit tu)
throws TimeoutException {
- // in this test, latch is released before wait
- assertTrue(done);
+ // in this test, latch is released after countDown is invoked
+ if (!done) {
+ throw new TimeoutException("timed out waiting for the target path");
+ }
}
@Override
@@ -343,4 +354,80 @@ public class TestLocalApplicationRunner {
assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish);
}
+ /**
+ * A test case to verify if the plan results in different hash if there is change in topological sort order.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testPlanIdWithShuffledStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(
+ new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+ List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+ assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle,
+ getExecutionPlanId(shuffledStreamSpecs));
+ }
+
+ /**
+ * A test case to verify if the plan results in same hash in case of same plan.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testGeneratePlanIdWithSameStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(
+ new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
+ String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);
+
+ assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt);
+ assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt);
+ }
+
+ /**
+ * A test case to verify plan results in different hash in case of different intermediate stream.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testGeneratePlanIdWithDifferentStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(
+ new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+ List<StreamSpec> updatedStreamSpecs = ImmutableList.of(
+ new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-4", "stream-4", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+ assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle,
+ getExecutionPlanId(updatedStreamSpecs));
+ }
+
+ private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
+ String intermediateStreamJson = updatedStreamSpecs.stream()
+ .map(this::streamSpecToJson)
+ .collect(Collectors.joining(","));
+
+ int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
+
+ return String.valueOf(planId);
+ }
+
+ private String streamSpecToJson(StreamSpec streamSpec) {
+ return String.format(STREAM_SPEC_JSON_FORMAT,
+ streamSpec.getId(),
+ streamSpec.getId(),
+ streamSpec.getSystemName(),
+ streamSpec.getPhysicalName());
+ }
}