You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/15 05:02:36 UTC

samza git commit: SAMZA-1385: Fix zookeeper path conflict between LocalApplicationRunner and ZkJobCoordinator

Repository: samza
Updated Branches:
  refs/heads/master a0aae5292 -> ef6bb8196


SAMZA-1385: Fix zookeeper path conflict between LocalApplicationRunner and ZkJobCoordinator

Tested the fix w/ sample page view adclick joiner job.
navina sborya nickpan47 can you please take a look at the RB?

Author: Bharath Kumarasubramanian <bk...@linkedin.com>

Reviewers: Navina Ramesh <na...@apache.org>, Yi Pan <ni...@gmail.com>, Shanthoosh Venkataraman <sv...@linkedin.com>

Closes #265 from bharathkk/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ef6bb819
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ef6bb819
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ef6bb819

Branch: refs/heads/master
Commit: ef6bb8196c669e651ef6628c7b74a5594713ee82
Parents: a0aae52
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Mon Aug 14 22:02:24 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Aug 14 22:02:24 2017 -0700

----------------------------------------------------------------------
 .../samza/runtime/LocalApplicationRunner.java   |  80 +++++++++---
 .../samza/zk/ZkJobCoordinatorFactory.java       |  20 ++-
 .../org/apache/samza/zk/ZkProcessorLatch.java   |  11 +-
 .../runtime/TestLocalApplicationRunner.java     | 127 ++++++++++++++++---
 4 files changed, 195 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index b0bfc8a..588e657 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.SamzaException;
@@ -57,13 +58,12 @@ import org.slf4j.LoggerFactory;
 public class LocalApplicationRunner extends AbstractApplicationRunner {
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
-  // Latch id that's used for awaiting the init of application before creating the StreamProcessors
-  private static final String INIT_LATCH_ID = "init";
+  private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData";
   // Latch timeout is set to 10 min
   private static final long LATCH_TIMEOUT_MINUTES = 10;
+  private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000;
 
   private final String uid;
-  private final CoordinationUtils coordinationUtils;
   private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
@@ -122,9 +122,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
         }
       }
 
-      if (coordinationUtils != null) {
-        coordinationUtils.reset();
-      }
       shutdownLatch.countDown();
     }
   }
@@ -132,7 +129,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   public LocalApplicationRunner(Config config) {
     super(config);
     uid = UUID.randomUUID().toString();
-    coordinationUtils = createCoordinationUtils();
   }
 
   @Override
@@ -159,10 +155,14 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     try {
       // 1. initialize and plan
       ExecutionPlan plan = getExecutionPlan(app);
-      writePlanJsonFile(plan.getPlanAsJson());
+
+      String executionPlanJson = plan.getPlanAsJson();
+      writePlanJsonFile(executionPlanJson);
 
       // 2. create the necessary streams
-      createStreams(plan.getIntermediateStreams());
+      // TODO: System generated intermediate streams should have robust naming scheme. Refer JIRA-1391
+      String planId = String.valueOf(executionPlanJson.hashCode());
+      createStreams(planId, plan.getIntermediateStreams());
 
       // 3. create the StreamProcessors
       if (plan.getJobConfigs().isEmpty()) {
@@ -216,7 +216,8 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     // TODO: we will need a better way to package the configs with application runner
     if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
       ApplicationConfig appConfig = new ApplicationConfig(config);
-      return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config);
+      return new ZkCoordinationServiceFactory().getCoordinationService(
+          appConfig.getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX, uid, config);
     } else {
       return null;
     }
@@ -227,20 +228,32 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
    * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
    * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
    * stream creation.
+   * @param planId a unique identifier representing the plan used for coordination purpose
    * @param intStreams list of intermediate {@link StreamSpec}s
-   * @throws Exception exception for latch timeout
+   * @throws TimeoutException exception for latch timeout
    */
-  /* package private */ void createStreams(List<StreamSpec> intStreams) throws Exception {
+  /* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
     if (!intStreams.isEmpty()) {
+      // Move the scope of coordination utils within stream creation to address long idle connection problem.
+      // Refer SAMZA-1385 for more details
+      CoordinationUtils coordinationUtils = createCoordinationUtils();
       if (coordinationUtils != null) {
-        Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID);
-        LeaderElector leaderElector = coordinationUtils.getLeaderElector();
-        leaderElector.setLeaderElectorListener(() -> {
-            getStreamManager().createStreams(intStreams);
-            initLatch.countDown();
-          });
-        leaderElector.tryBecomeLeader();
-        initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
+        Latch initLatch = coordinationUtils.getLatch(1, planId);
+
+        try {
+          // check if the processor needs to go through leader election and stream creation
+          if (shouldContestInElectionForStreamCreation(initLatch)) {
+            LeaderElector leaderElector = coordinationUtils.getLeaderElector();
+            leaderElector.setLeaderElectorListener(() -> {
+                getStreamManager().createStreams(intStreams);
+                initLatch.countDown();
+              });
+            leaderElector.tryBecomeLeader();
+            initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
+          }
+        } finally {
+          coordinationUtils.reset();
+        }
       } else {
         // each application process will try creating the streams, which
         // requires stream creation to be idempotent
@@ -272,4 +285,31 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
           taskFactory.getClass().getCanonicalName()));
     }
   }
+
+  /**
+   * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying
+   * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election.
+   * It creates the following issues whenever a new process joins after the ephemeral node is gone.
+   *    1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle
+   *    2. Underlying systems may not support check for stream existence prior to creation which could have potential problems.
+   * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the
+   * application lifecycle using {@link Latch#await(long, TimeUnit)}
+   *
+   * @param streamCreationLatch latch used for stream creation
+   * @return true if processor needs to be part of election
+   *         false otherwise
+   */
+  private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) {
+    boolean eligibleForElection = true;
+
+    try {
+      streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+      // case we didn't time out suggesting that latch already exists
+      eligibleForElection = false;
+    } catch (TimeoutException e) {
+      LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams");
+    }
+
+    return eligibleForElection;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 85e3b4a..08d826e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -22,6 +22,7 @@ package org.apache.samza.zk;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -30,9 +31,13 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
 
   private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
+  private static final String JOB_COORDINATOR_ZK_PATH_FORMAT = "%s/%s-%s-coordinationData";
+  private static final String DEFAULT_JOB_ID = "1";
+  private static final String DEFAULT_JOB_NAME = "defaultJob";
 
   /**
    * Method to instantiate an implementation of JobCoordinator
@@ -50,8 +55,21 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
 
   private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
     ZkConfig zkConfig = new ZkConfig(config);
-    ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config));
     ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
     return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
   }
+
+  private String getJobCoordinationZkPath(Config config) {
+    JobConfig jobConfig = new JobConfig(config);
+    String appId = new ApplicationConfig(config).getGlobalAppId();
+    String jobName = jobConfig.getName().isDefined()
+        ? jobConfig.getName().get()
+        : DEFAULT_JOB_NAME;
+    String jobId = jobConfig.getJobId().isDefined()
+        ? jobConfig.getJobId().get()
+        : DEFAULT_JOB_ID;
+
+    return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, jobId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
index decdd7d..166c627 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkProcessorLatch.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
 
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.TimeoutException;
 import org.apache.samza.coordinator.Latch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +54,14 @@ public class ZkProcessorLatch implements Latch {
   }
 
   @Override
-  public void await(long timeout, TimeUnit timeUnit) {
-    zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
+  public void await(long timeout, TimeUnit timeUnit) throws TimeoutException {
+    // waitUntilExists signals timeout by returning false as opposed to throwing exception. We internally need to map
+    // the non-existence to a TimeoutException in order to respect the contract defined in Latch interface
+    boolean targetPathExists = zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
+
+    if (!targetPathExists) {
+      throw new TimeoutException("Timed out waiting for the targetPath");
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/ef6bb819/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index a04bd3b..6c7827e 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -19,6 +19,15 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
@@ -38,31 +47,31 @@ import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 public class TestLocalApplicationRunner {
 
+  private static final String PLAN_JSON = "{"
+      + "\"jobs\":[{"
+      + "\"jobName\":\"test-application\","
+      + "\"jobId\":\"1\","
+      + "\"operatorGraph\":{"
+      + "\"intermediateStreams\":{%s},"
+      + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
+  private static final String STREAM_SPEC_JSON_FORMAT = "\"%s\":{"
+      + "\"streamSpec\":{"
+      + "\"id\":\"%s\","
+      + "\"systemName\":\"%s\","
+      + "\"physicalName\":\"%s\","
+      + "\"partitionCount\":2},"
+      + "\"sourceJobs\":[\"test-app\"],"
+      + "\"targetJobs\":[\"test-target-app\"]},";
+
   @Test
   public void testStreamCreation() throws Exception {
     Map<String, String> config = new HashMap<>();
@@ -180,8 +189,10 @@ public class TestLocalApplicationRunner {
       @Override
       public void await(long timeout, TimeUnit tu)
           throws TimeoutException {
-        // in this test, latch is released before wait
-        assertTrue(done);
+        // in this test, latch is released after countDown is invoked
+        if (!done) {
+          throw new TimeoutException("timed out waiting for the target path");
+        }
       }
 
       @Override
@@ -343,4 +354,80 @@ public class TestLocalApplicationRunner {
     assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish);
   }
 
+  /**
+   * A test case to verify if the plan results in different hash if there is change in topological sort order.
+   * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+   */
+  @Test
+  public void testPlanIdWithShuffledStreamSpecs() {
+    List<StreamSpec> streamSpecs = ImmutableList.of(
+      new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+    String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+    List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(
+        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+    assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle,
+        getExecutionPlanId(shuffledStreamSpecs));
+  }
+
+  /**
+   * A test case to verify if the plan results in same hash in case of same plan.
+   * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+   */
+  @Test
+  public void testGeneratePlanIdWithSameStreamSpecs() {
+    List<StreamSpec> streamSpecs = ImmutableList.of(
+        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+    String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
+    String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);
+
+    assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt);
+    assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt);
+  }
+
+  /**
+   * A test case to verify plan results in different hash in case of different intermediate stream.
+   * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+   */
+  @Test
+  public void testGeneratePlanIdWithDifferentStreamSpecs() {
+    List<StreamSpec> streamSpecs = ImmutableList.of(
+        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+    String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+    List<StreamSpec> updatedStreamSpecs = ImmutableList.of(
+        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-4", "stream-4", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+    assertNotEquals("Expected both of the latch ids to be different", planIdBeforeShuffle,
+        getExecutionPlanId(updatedStreamSpecs));
+  }
+
+  private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
+    String intermediateStreamJson = updatedStreamSpecs.stream()
+        .map(this::streamSpecToJson)
+        .collect(Collectors.joining(","));
+
+    int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
+
+    return String.valueOf(planId);
+  }
+
+  private String streamSpecToJson(StreamSpec streamSpec) {
+    return String.format(STREAM_SPEC_JSON_FORMAT,
+        streamSpec.getId(),
+        streamSpec.getId(),
+        streamSpec.getSystemName(),
+        streamSpec.getPhysicalName());
+  }
 }