You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/05/22 21:09:16 UTC
[samza] branch master updated: SAMZA-1531: Support run.id in
standalone for batch processing.
This is an automated email from the ASF dual-hosted git repository.
jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6d5f446 SAMZA-1531: Support run.id in standalone for batch processing.
6d5f446 is described below
commit 6d5f4461d0fa2485ea1274d051f57c5983df5f2e
Author: Manasa <mg...@linkedin.com>
AuthorDate: Wed May 22 14:09:08 2019 -0700
SAMZA-1531: Support run.id in standalone for batch processing.
Adds run.id in standalone: only for BATCH mode and no changes for STREAM mode.
Known issues:
- No support for Azure in this PR. a follow up PR will be made for it. However, this PR does not break Azure/Passthrough and resorts to current behavior for Azure/Passthrough.
- upon an unclean shut down of the job, a restart before ZK server session timeout would result in reuse of the old runid.
Author: Manasa <mg...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>, Bharath <bk...@linkedin.com>
Closes #938 from lakshmi-manasa-g/standaloneRunid
---
.../samza/coordinator/AzureCoordinationUtils.java | 19 +-
.../org/apache/samza/coordinator/AzureLock.java | 11 +-
.../samza/coordinator/ClusterMembership.java | 60 ++++
...ckWithState.java => CoordinationConstants.java} | 26 +-
.../samza/coordinator/CoordinationUtils.java | 9 +-
.../apache/samza/coordinator/DistributedLock.java | 11 +-
.../apache/samza/coordinator/RunIdGenerator.java | 110 +++++++
.../apache/samza/execution/LocalJobPlanner.java | 126 ++++++--
.../samza/runtime/LocalApplicationRunner.java | 93 +++++-
.../org/apache/samza/zk/ZkClusterMembership.java | 71 +++++
.../org/apache/samza/zk/ZkCoordinationUtils.java | 10 +-
.../samza/zk/ZkCoordinationUtilsFactory.java | 4 +-
.../org/apache/samza/zk/ZkDistributedLock.java | 94 +++---
.../apache/samza/zk/ZkMetadataStoreFactory.java | 6 +-
.../samza/coordinator/TestRunIdGenerator.java | 93 ++++++
.../samza/execution/TestLocalJobPlanner.java | 34 ++-
.../samza/runtime/TestLocalApplicationRunner.java | 122 +++++++-
.../apache/samza/zk/TestZkClusterMembership.java | 133 +++++++++
.../org/apache/samza/zk/TestZkDistributedLock.java | 126 ++++++++
.../org/apache/samza/zk/TestZkMetadataStore.java | 2 +-
.../processor/TestZkLocalApplicationRunner.java | 321 +++++++++++++++++++--
21 files changed, 1323 insertions(+), 158 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index f50ab72..eaac8e5 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -22,7 +22,6 @@ package org.apache.samza.coordinator;
import org.apache.samza.AzureClient;
import org.apache.samza.config.AzureConfig;
import org.apache.samza.config.Config;
-import org.apache.samza.util.BlobUtils;
public class AzureCoordinationUtils implements CoordinationUtils {
@@ -45,11 +44,21 @@ public class AzureCoordinationUtils implements CoordinationUtils {
return null;
}
+ /**
+ * To support DistributedLock in Azure, even {@link org.apache.samza.metadatastore.MetadataStore} needs to be implemented.
+ * Because, both of these are used in {@link org.apache.samza.execution.LocalJobPlanner} for intermediate stream creation.
+ * Currently MetadataStore defaults to ZkMetataStore in LocalJobPlanner due to `metadata.store.factory` not being exposed
+ * So in order to avoid using AzureLock coupled with ZkMetadataStore, DistributedLock is not supported for Azure
+ * See SAMZA-2180 for more details.
+ */
@Override
- public DistributedLockWithState getLockWithState(String lockId) {
- BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(),
- azureConfig.getAzureBlobName() + lockId, azureConfig.getAzureBlobLength());
- return new AzureLock(blob);
+ public DistributedLock getLock(String lockId) {
+ throw new UnsupportedOperationException("DistributedLock not supported in Azure!");
+ }
+
+ @Override
+ public ClusterMembership getClusterMembership() {
+ throw new UnsupportedOperationException("ClusterMembership not supported in Azure!");
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
index 8cddc4c..22cdf2e 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -19,8 +19,8 @@
package org.apache.samza.coordinator;
+import java.time.Duration;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.AzureException;
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
/**
* Distributed lock primitive for Azure.
*/
-public class AzureLock implements DistributedLockWithState {
+public class AzureLock implements DistributedLock {
private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
private static final int LEASE_TIME_IN_SEC = 60;
@@ -51,14 +51,13 @@ public class AzureLock implements DistributedLockWithState {
* Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
* The lock is acquired when the blob is leased successfully.
* @param timeout Duration after which timeout occurs.
- * @param unit Time Unit of the timeout defined above.
* @return true if the lock was acquired successfully, false if lock acquire operation is unsuccessful even after subsequent tries within the timeout range.
*/
@Override
- public boolean lockIfNotSet(long timeout, TimeUnit unit) {
+ public boolean lock(Duration timeout) {
//Start timer for timeout
long startTime = System.currentTimeMillis();
- long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ long lockTimeout = timeout.toMillis();
Random random = new Random();
while ((System.currentTimeMillis() - startTime) < lockTimeout) {
@@ -87,7 +86,7 @@ public class AzureLock implements DistributedLockWithState {
* Unlocks, by releasing the lease on the blob.
*/
@Override
- public void unlockAndSet() {
+ public void unlock() {
boolean status = leaseBlobManager.releaseLease(leaseId.get());
if (status) {
LOG.info("Unlocked successfully.");
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java b/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java
new file mode 100644
index 0000000..45b6fd3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Coordination Primitive to maintain the list of processors in the quorum
+ *
+ * Guarantees:
+ * 1. operations are linearizable
+ * 2. registration persistence in the absence of connection errors
+ *
+ * Non-guarantees:
+ * 1. thread safe
+ * 2. concurrent access of the list of processors in the quorum
+ * 3. persistence of registration across connection errors
+ * 4. processorId as indicator of registration order
+ *
+ * Implementor responsibilities:
+ * 1. registerProcessor returns a unique processorId
+ * 2. getNumberOfProcessors by a processor should reflect at least its own registration status
+ * 3. unregisterProcessor for a null or unregistered processorId is a no-op
+ */
+@InterfaceStability.Evolving
+public interface ClusterMembership {
+ /**
+ * add processor to the list of processors in the quorum
+ * @return unique id of the processor registration
+ */
+ String registerProcessor();
+
+ /**
+ * @return number of processors in the list
+ */
+ int getNumberOfProcessors();
+
+ /**
+ * remove processor from the list of processors in the quorum
+ * @param processorId to be removed from the list
+ */
+ void unregisterProcessor(String processorId);
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
similarity index 54%
rename from samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
rename to samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index c8e9033..d7a648b 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -19,24 +19,12 @@
package org.apache.samza.coordinator;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+public final class CoordinationConstants {
+ private CoordinationConstants() {}
-public interface DistributedLockWithState {
-
- /**
- * Try to acquire the lock, but first check if the state flag is set. If it is set, return false.
- * If the flag is not set, and lock is acquired - return true.
- * @param timeout Duration of lock acquiring timeout.
- * @param unit Time Unit of the timeout defined above.
- * @return true if lock is acquired successfully, false if state is already set.
- * @throws TimeoutException if could not acquire the lock.
- */
- boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;
-
- /**
- * Release the lock and set the state
- */
- void unlockAndSet();
-}
\ No newline at end of file
+ public static final String RUNID_STORE_KEY = "runId";
+ public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData";
+ public static final String RUNID_LOCK_ID = "runId";
+ public static final int LOCK_TIMEOUT_MS = 300000;
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
index 9ebd2e2..81507c9 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
@@ -24,10 +24,11 @@ import org.apache.samza.annotation.InterfaceStability;
*
* Coordination service provides synchronization primitives.
* The actual implementation (for example ZK based) is left to each implementation class.
- * This service provide three primitives:
+ * This service provides the following primitives:
* - LeaderElection
* - Latch
- * - LockWithState (does not lock if state is set)
+ * - Lock
+ * - ClusterMembership (to check number of processors in quorum)
*/
@InterfaceStability.Evolving
public interface CoordinationUtils {
@@ -37,7 +38,9 @@ public interface CoordinationUtils {
Latch getLatch(int size, String latchId);
- DistributedLockWithState getLockWithState(String lockId);
+ DistributedLock getLock(String lockId);
+
+ ClusterMembership getClusterMembership();
/**
* utilites cleanup
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java
similarity index 78%
rename from samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
rename to samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java
index 6972cd9..a605656 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java
@@ -19,21 +19,20 @@
package org.apache.samza.coordinator;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
public interface DistributedLock {
/**
- * Tries to acquire the lock
+ * Try to acquire the lock
* @param timeout Duration of lock acquiring timeout.
- * @param unit Time Unit of the timeout defined above.
- * @return true if lock is acquired successfully, false if it times out.
+ * @return true if lock is acquired successfully else returns false if failed to acquire within timeout
*/
- boolean lock(long timeout, TimeUnit unit);
+ boolean lock(Duration timeout);
/**
- * Releases the lock
+ * Release the lock
*/
void unlock();
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
new file mode 100644
index 0000000..284c0bf
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.base.Preconditions;
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generates RunId for Standalone use case
+ * If there is only one processor in the quorum (registered with ClusterMembership) then create new runid and add to store
+ * Else read runid from the store
+ *
+ * Steps to generate:
+ * 1. acquire lock
+ * 2. add self to quorum (register itself with ClusterMembership)
+ * 3. get number of processors in quorum
+ * 4. if qurorum size is 1 (only self) then create new runid and write to store
+ * 5. if quorum size if greater than 1 then read runid from store
+ * 6. unlock
+ */
+public class RunIdGenerator {
+ private static final Logger LOG = LoggerFactory.getLogger(RunIdGenerator.class);
+
+ private final CoordinationUtils coordinationUtils;
+ private final MetadataStore metadataStore;
+ private final ClusterMembership clusterMembership;
+ private String processorId = null;
+ private volatile boolean closed = false;
+
+ public RunIdGenerator(CoordinationUtils coordinationUtils, MetadataStore metadataStore) {
+ Preconditions.checkNotNull(coordinationUtils, "CoordinationUtils cannot be null");
+ Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
+ this.coordinationUtils = coordinationUtils;
+ this.metadataStore = metadataStore;
+ this.clusterMembership = coordinationUtils.getClusterMembership();
+ Preconditions.checkNotNull(this.clusterMembership, "Failed to create utils for run id generation");
+ }
+
+ public Optional<String> getRunId() {
+ DistributedLock runIdLock;
+ String runId = null;
+
+ runIdLock = coordinationUtils.getLock(CoordinationConstants.RUNID_LOCK_ID);
+ if (runIdLock == null) {
+ throw new SamzaException("Failed to create utils for run id generation");
+ }
+
+ try {
+ // acquire lock to write or read run.id
+ if (runIdLock.lock(Duration.ofMillis(CoordinationConstants.LOCK_TIMEOUT_MS))) {
+ LOG.info("lock acquired for run.id generation by this processor");
+ processorId = clusterMembership.registerProcessor();
+ int numberOfActiveProcessors = clusterMembership.getNumberOfProcessors();
+ if (numberOfActiveProcessors == 0) {
+ String msg = String.format("Processor failed to fetch number of processors for run.id generation");
+ throw new SamzaException(msg);
+ }
+ if (numberOfActiveProcessors == 1) {
+ runId =
+ String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
+ LOG.info("Writing the run id for this run as {}", runId);
+ metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8"));
+ } else {
+ runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY));
+ LOG.info("Read the run id for this run as {}", runId);
+ }
+ runIdLock.unlock();
+ } else {
+ throw new SamzaException("Processor timed out waiting to acquire lock for run.id generation");
+ }
+ } catch (UnsupportedEncodingException e) {
+ throw new SamzaException("Processor could not serialize/deserialize string for run.id generation", e);
+ }
+ return Optional.ofNullable(runId);
+ }
+
+ /**
+ * might be called several times and hence should be idempotent
+ */
+ public void close() {
+ if (!closed && processorId != null) {
+ closed = true;
+ clusterMembership.unregisterProcessor(processorId);
+ }
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 5cce6c5..48a4a3e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -18,8 +18,9 @@
*/
package org.apache.samza.execution;
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.samza.SamzaException;
@@ -28,9 +29,15 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
+import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,19 +50,35 @@ import org.slf4j.LoggerFactory;
*/
public class LocalJobPlanner extends JobPlanner {
private static final Logger LOG = LoggerFactory.getLogger(LocalJobPlanner.class);
- private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData";
+ private static final String STREAM_CREATION_METADATA_STORE = "StreamCreationCoordinationStore";
+ private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory";
+ public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();
+ private static final String STREAM_CREATED_STATE_KEY = "StreamCreated_%s";
- private final String uid = UUID.randomUUID().toString();
+ private final String processorId;
+ private final CoordinationUtils coordinationUtils;
+ private final String runId;
- public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+ public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor, String processorId) {
super(descriptor);
+ this.processorId = processorId;
+ JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig);
+ this.coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, processorId, userConfig);
+ this.runId = null;
+ }
+
+ public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor, CoordinationUtils coordinationUtils, String processorId, String runId) {
+ super(descriptor);
+ this.coordinationUtils = coordinationUtils;
+ this.processorId = processorId;
+ this.runId = runId;
}
@Override
public List<JobConfig> prepareJobs() {
// for high-level DAG, generating the plan and job configs
// 1. initialize and plan
- ExecutionPlan plan = getExecutionPlan();
+ ExecutionPlan plan = getExecutionPlan(runId);
String executionPlanJson = "";
try {
@@ -104,36 +127,89 @@ public class LocalJobPlanner extends JobPlanner {
LOG.info("Set of intermediate streams is empty. Nothing to create.");
return;
}
- LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
+ LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", processorId);
// Move the scope of coordination utils within stream creation to address long idle connection problem.
// Refer SAMZA-1385 for more details
- JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig);
- String coordinationId = new ApplicationConfig(userConfig).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
- CoordinationUtils coordinationUtils =
- jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, userConfig);
if (coordinationUtils == null) {
- LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
+ LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", processorId);
// each application process will try creating the streams, which
// requires stream creation to be idempotent
streamManager.createStreams(intStreams);
return;
}
- DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId);
+ // If BATCH, then need to create new intermediate streams every run.
+ // planId does not change every run and hence, need to use runid
+ // as the lockId to create a new lock with state each run
+ // to create new streams each run.
+ // If run.id is null, defaults to old behavior of using planId
+ boolean isAppModeBatch = new ApplicationConfig(userConfig).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+ String lockId = planId;
+ if (isAppModeBatch && runId != null) {
+ lockId = runId;
+ }
try {
- // check if the processor needs to go through leader election and stream creation
- if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
- LOG.info("lock acquired for streams creation by " + uid);
- streamManager.createStreams(intStreams);
- lockWithState.unlockAndSet();
- } else {
- LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid);
- }
- } catch (TimeoutException e) {
- String msg = String.format("Processor {} failed to get the lock for stream initialization", uid);
- throw new SamzaException(msg, e);
+ checkAndCreateStreams(lockId, intStreams, streamManager);
+ } catch (TimeoutException te) {
+ throw new SamzaException(String.format("Processor {} failed to get the lock for stream initialization within timeout.", processorId), te);
} finally {
- coordinationUtils.close();
+ if (!isAppModeBatch && coordinationUtils != null) {
+ coordinationUtils.close();
+ }
+ }
+ }
+
+ private void checkAndCreateStreams(String lockId, List<StreamSpec> intStreams, StreamManager streamManager) throws TimeoutException {
+ MetadataStore metadataStore = getMetadataStore();
+ DistributedLock distributedLock = coordinationUtils.getLock(lockId);
+ if (distributedLock == null || metadataStore == null) {
+ LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", processorId);
+ // each application process will try creating the streams, which requires stream creation to be idempotent
+ streamManager.createStreams(intStreams);
+ return;
+ }
+ //Start timer for timeout
+ long startTime = System.currentTimeMillis();
+ long lockTimeout = TimeUnit.MILLISECONDS.convert(CoordinationConstants.LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ // If "stream created state" exists in store then skip stream creation
+ // Else acquire lock, create streams, set state in store and unlock
+ // Checking for state before acquiring lock to prevent all processors from acquiring lock
+ // In a while loop so that if two processors check state simultaneously then
+ // to make sure the processor not acquiring the lock
+ // does not die of timeout exception and comes back and checks for state and proceeds
+ while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+ if (metadataStore.get(String.format(STREAM_CREATED_STATE_KEY, lockId)) != null) {
+ LOG.info("Processor {} found streams created state data. They must've been created by another processor.", processorId);
+ break;
+ }
+ try {
+ if (distributedLock.lock(Duration.ofMillis(10000))) {
+ LOG.info("lock acquired for streams creation by Processor " + processorId);
+ streamManager.createStreams(intStreams);
+ String streamCreatedMessage = "Streams created by processor " + processorId;
+ metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), streamCreatedMessage.getBytes("UTF-8"));
+ distributedLock.unlock();
+ break;
+ } else {
+ LOG.info("Processor {} failed to get the lock for stream initialization. Will try again until time out", processorId);
+ }
+ } catch (UnsupportedEncodingException e) {
+ String msg = String.format("Processor {} failed to encode string for stream initialization", processorId);
+ throw new SamzaException(msg, e);
+ }
+ }
+ if ((System.currentTimeMillis() - startTime) >= lockTimeout) {
+ throw new TimeoutException(String.format("Processor {} failed to get the lock for stream initialization within {} milliseconds.", processorId, CoordinationConstants.LOCK_TIMEOUT_MS));
+ }
+ }
+
+ private MetadataStore getMetadataStore() {
+ String metadataStoreFactoryClass = appDesc.getConfig().get(METADATA_STORE_FACTORY_CONFIG);
+ if (metadataStoreFactoryClass == null) {
+ metadataStoreFactoryClass = DEFAULT_METADATA_STORE_FACTORY;
}
+ MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
+ return metadataStoreFactory.getMetadataStore(STREAM_CREATION_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7da3369..c0b85f2 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -42,14 +43,22 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.context.ExternalContext;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.RunIdGenerator;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,13 +68,20 @@ import org.slf4j.LoggerFactory;
public class LocalApplicationRunner implements ApplicationRunner {
private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
+ private static final String PROCESSOR_ID = UUID.randomUUID().toString();
+ private final static String RUN_ID_METADATA_STORE = "RunIdCoordinationStore";
+ private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory";
+ public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();
private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
- private final LocalJobPlanner planner;
private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final AtomicInteger numProcessorsToStart = new AtomicInteger();
private final AtomicReference<Throwable> failure = new AtomicReference<>();
+ private final boolean isAppModeBatch;
+ private final Optional<CoordinationUtils> coordinationUtils;
+ private Optional<String> runId = Optional.empty();
+ private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
private ApplicationStatus appStatus = ApplicationStatus.New;
@@ -77,20 +93,74 @@ public class LocalApplicationRunner implements ApplicationRunner {
*/
public LocalApplicationRunner(SamzaApplication app, Config config) {
this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
- this.planner = new LocalJobPlanner(appDesc);
+ isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+ coordinationUtils = getCoordinationUtils(config);
}
/**
* Constructor only used in unit test to allow injection of {@link LocalJobPlanner}
*/
@VisibleForTesting
- LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, LocalJobPlanner planner) {
+ LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) {
this.appDesc = appDesc;
- this.planner = planner;
+ isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+ this.coordinationUtils = coordinationUtils;
+ }
+
+ private Optional<CoordinationUtils> getCoordinationUtils(Config config) {
+ if (!isAppModeBatch) {
+ return Optional.empty();
+ }
+ JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+ CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config);
+ return Optional.ofNullable(coordinationUtils);
+ }
+
+ /**
+ * @return LocalJobPlanner created
+ */
+ @VisibleForTesting
+ LocalJobPlanner getPlanner() {
+ boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+ if (!isAppModeBatch) {
+ return new LocalJobPlanner(appDesc, PROCESSOR_ID);
+ }
+ CoordinationUtils coordinationUtils = this.coordinationUtils.orElse(null);
+ String runId = this.runId.orElse(null);
+ return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, runId);
+ }
+
+
+ private void initializeRunId() {
+ if (!isAppModeBatch) {
+ LOG.info("Not BATCH mode and hence not generating run id");
+ return;
+ }
+
+ if (!coordinationUtils.isPresent()) {
+ LOG.warn("Coordination utils not present. Aborting run id generation. Will continue execution without a run id.");
+ return;
+ }
+
+ try {
+ MetadataStore metadataStore = getMetadataStore();
+ runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore));
+ runId = runIdGenerator.flatMap(RunIdGenerator::getRunId);
+ } catch (Exception e) {
+ LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e);
+ }
+ }
+
+ public Optional<String> getRunId() {
+ return this.runId;
}
@Override
public void run(ExternalContext externalContext) {
+ initializeRunId();
+
+ LocalJobPlanner planner = getPlanner();
+
try {
List<JobConfig> jobConfigs = planner.prepareJobs();
@@ -109,6 +179,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
// start the StreamProcessors
processors.forEach(StreamProcessor::start);
} catch (Throwable throwable) {
+ cleanup();
appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
shutdownLatch.countDown();
throw new SamzaException(String.format("Failed to start application: %s",
@@ -119,6 +190,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
@Override
public void kill() {
processors.forEach(StreamProcessor::stop);
+ cleanup();
}
@Override
@@ -151,6 +223,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
throw new SamzaException(e);
}
+ cleanup();
return finished;
}
@@ -200,6 +273,17 @@ public class LocalApplicationRunner implements ApplicationRunner {
}
}
+ private void cleanup() {
+ runIdGenerator.ifPresent(RunIdGenerator::close);
+ coordinationUtils.ifPresent(CoordinationUtils::close);
+ }
+
+ private MetadataStore getMetadataStore() {
+ String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY);
+ MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
+ return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
+ }
+
/**
* Defines a specific implementation of {@link ProcessorLifecycleListener} for local {@link StreamProcessor}s.
*/
@@ -262,6 +346,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
userDefinedProcessorLifecycleListener.afterStop();
}
if (processors.isEmpty()) {
+ cleanup();
// no processor is still running. Notify callers waiting on waitForFinish()
shutdownLatch.countDown();
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java b/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java
new file mode 100644
index 0000000..0690821
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.coordinator.ClusterMembership;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkClusterMembership implements ClusterMembership {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ZkClusterMembership.class);
+ public static final String PROCESSORS_PATH = "processors";
+ private final ZkUtils zkUtils;
+ private final String processorsPath;
+ private final String participantId;
+ private final ZkKeyBuilder keyBuilder;
+
+ public ZkClusterMembership(String participantId, ZkUtils zkUtils) {
+ Preconditions.checkNotNull(participantId, "ParticipantId cannot be null");
+ Preconditions.checkNotNull(zkUtils, "ZkUtils cannot be null");
+ this.zkUtils = zkUtils;
+ this.participantId = participantId;
+ this.keyBuilder = zkUtils.getKeyBuilder();
+ processorsPath = String.format("%s/%s", keyBuilder.getRootPath(), PROCESSORS_PATH);
+ zkUtils.validatePaths(new String[] {processorsPath});
+ }
+
+ @Override
+ public String registerProcessor() {
+ String nodePath = zkUtils.getZkClient().createEphemeralSequential(processorsPath + "/", participantId);
+ LOG.info("created ephemeral node. Registered the processor in the cluster.");
+ return ZkKeyBuilder.parseIdFromPath(nodePath);
+ }
+
+ @Override
+ public int getNumberOfProcessors() {
+ return zkUtils.getZkClient().getChildren(processorsPath).size();
+ }
+
+ @Override
+ public void unregisterProcessor(String processorId) {
+ if (processorId == null) {
+ LOG.warn("Can not unregister processor with null processorId");
+ return;
+ }
+ String nodePath = processorsPath + "/" + processorId;
+ if (zkUtils.exists(nodePath)) {
+ zkUtils.getZkClient().delete(nodePath);
+ LOG.info("Ephemeral node deleted. Unregistered the processor from cluster membership.");
+ } else {
+ LOG.warn("Ephemeral node you want to delete doesnt exist. Processor with id {} is not currently registered.", processorId);
+ }
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 3d4a2d1..1e3b58f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -20,8 +20,9 @@ package org.apache.samza.zk;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.ClusterMembership;
import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
import org.slf4j.Logger;
@@ -52,7 +53,7 @@ public class ZkCoordinationUtils implements CoordinationUtils {
}
@Override
- public DistributedLockWithState getLockWithState(String lockId) {
+ public DistributedLock getLock(String lockId) {
return new ZkDistributedLock(processorIdStr, zkUtils, lockId);
}
@@ -70,4 +71,9 @@ public class ZkCoordinationUtils implements CoordinationUtils {
public ZkUtils getZkUtils() {
return zkUtils;
}
+
+ @Override
+ public ClusterMembership getClusterMembership() {
+ return new ZkClusterMembership(processorIdStr, zkUtils);
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index e93f290..0cf93c9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -21,6 +21,7 @@ package org.apache.samza.zk;
import com.google.common.base.Strings;
import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.CoordinationUtils;
@@ -34,7 +35,8 @@ import org.slf4j.LoggerFactory;
public class ZkCoordinationUtilsFactory implements CoordinationUtilsFactory {
private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtilsFactory.class);
- public CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) {
+ public CoordinationUtils getCoordinationUtils(String coordinationId, String participantId, Config config) {
+ String groupId = new ApplicationConfig(config).getGlobalAppId() + "/" + coordinationId;
ZkConfig zkConfig = new ZkConfig(config);
ZkClient zkClient =
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
index cfb4641..24aa26e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
@@ -18,12 +18,10 @@
*/
package org.apache.samza.zk;
+import java.time.Duration;
import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +29,7 @@ import org.slf4j.LoggerFactory;
/**
* Distributed lock primitive for Zookeeper.
*/
-public class ZkDistributedLock implements DistributedLockWithState {
+public class ZkDistributedLock implements DistributedLock {
public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);
private static final String STATE_INITED = "sate_initialized";
@@ -39,72 +37,65 @@ public class ZkDistributedLock implements DistributedLockWithState {
private final String lockPath;
private final String participantId;
private final ZkKeyBuilder keyBuilder;
- private final Random random = new Random();
private String nodePath = null;
- private final String statePath;
+ private Object mutex;
public ZkDistributedLock(String participantId, ZkUtils zkUtils, String lockId) {
this.zkUtils = zkUtils;
this.participantId = participantId;
this.keyBuilder = zkUtils.getKeyBuilder();
- lockPath = String.format("%s/stateLock_%s", keyBuilder.getRootPath(), lockId);
- statePath = String.format("%s/%s_%s", lockPath, STATE_INITED, lockId);
+ lockPath = String.format("%s/lock_%s", keyBuilder.getRootPath(), lockId);
zkUtils.validatePaths(new String[] {lockPath});
+ mutex = new Object();
+ zkUtils.getZkClient().subscribeChildChanges(lockPath, new ParticipantChangeHandler(zkUtils));
}
/**
* Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
* Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock.
* @param timeout Duration of lock acquiring timeout.
- * @param unit Unit of the timeout defined above.
- * @return true if lock is acquired successfully, false if it times out.
+ * @return true if lock is acquired successfully else returns false if failed to acquire within timeout
*/
@Override
- public boolean lockIfNotSet(long timeout, TimeUnit unit)
- throws TimeoutException {
+ public boolean lock(Duration timeout) {
nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId);
//Start timer for timeout
long startTime = System.currentTimeMillis();
- long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ long lockTimeout = timeout.toMillis();
while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+ synchronized (mutex) {
+ List<String> children = zkUtils.getZkClient().getChildren(lockPath);
+ int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath));
- if (zkUtils.getZkClient().exists(statePath)) {
- // state already set, no point locking
- return false;
- }
-
- List<String> children = zkUtils.getZkClient().getChildren(lockPath);
- int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath));
-
- if (children.size() == 0 || index == -1) {
- throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
- }
- // Acquires lock when the node has the lowest sequence number and returns.
- if (index == 0) {
- LOG.info("Acquired lock for participant id: {}", participantId);
- return true;
- } else {
- try {
- Thread.sleep(random.nextInt(1000));
- } catch (InterruptedException e) {
- Thread.interrupted();
+ if (children.size() == 0 || index == -1) {
+ throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
+ }
+ // Acquires lock when the node has the lowest sequence number and returns.
+ if (index == 0) {
+ LOG.info("Acquired lock for participant id: {}", participantId);
+ return true;
+ } else {
+ try {
+ mutex.wait(lockTimeout);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+ LOG.info("Trying to acquire lock again...");
}
- LOG.info("Trying to acquire lock again...");
}
}
- throw new TimeoutException("could not acquire lock for " + timeout + " " + unit.toString());
+ LOG.info("Failed to acquire lock within {} milliseconds.", lockTimeout);
+ return false;
}
/**
* Unlocks, by deleting the ephemeral sequential node created to acquire the lock.
*/
@Override
- public void unlockAndSet() {
- // set state
- zkUtils.getZkClient().createPersistent(statePath, true);
+ public void unlock() {
if (nodePath != null) {
zkUtils.getZkClient().delete(nodePath);
@@ -114,4 +105,29 @@ public class ZkDistributedLock implements DistributedLockWithState {
LOG.warn("Ephemeral lock node you want to delete doesn't exist");
}
}
+
+ /**
+ * Listener for changes in children of LOCK
+ * children are the ephemeral nodes created to acquire the lock
+ */
+ class ParticipantChangeHandler extends ZkUtils.GenerationAwareZkChildListener {
+
+ public ParticipantChangeHandler(ZkUtils zkUtils) {
+ super(zkUtils, "ParticipantChangeHandler");
+ }
+
+ // Called when the children of the given path changed.
+ @Override
+ public void doHandleChildChange(String parentPath, List<String> currentChildren)
+ throws Exception {
+ synchronized (mutex) {
+ if (currentChildren == null) {
+ LOG.warn("handleChildChange on path " + parentPath + " was invoked with NULL list of children");
+ } else {
+ LOG.info("ParticipantChangeHandler::handleChildChange - Path: {} Current Children: {} ", parentPath, currentChildren);
+ mutex.notify();
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
index a9c979d..5aaa261 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
@@ -18,7 +18,9 @@
*/
package org.apache.samza.zk;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistry;
@@ -31,6 +33,8 @@ public class ZkMetadataStoreFactory implements MetadataStoreFactory {
@Override
public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
- return new ZkMetadataStore(namespace, config, metricsRegistry);
+ String globalAppId = new ApplicationConfig(config).getGlobalAppId();
+ String metadataStoreBaseDir = "/" + globalAppId + "/" + CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX + "/" + namespace;
+ return new ZkMetadataStore(metadataStoreBaseDir, config, metricsRegistry);
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
new file mode 100644
index 0000000..39bc583
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.metadatastore.MetadataStore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class TestRunIdGenerator {
+
+ private static final String FAKE_RUNID = "FAKE_RUNID";
+ private RunIdGenerator runIdGenerator;
+ private CoordinationUtils coordinationUtils;
+ private DistributedLock distributedLock;
+ private ClusterMembership membership;
+ private MetadataStore metadataStore;
+
+ @Test
+ public void testSingleProcessorWriteRunId() throws Exception {
+ // When there is a single processor registered with ClusterMembership
+ // RunIdGenerator should write a new run id to the MetadataStore
+
+ prepareRunIdGenerator(1);
+
+ runIdGenerator.getRunId();
+
+ verify(coordinationUtils, Mockito.times(1)).getClusterMembership();
+ verify(coordinationUtils, Mockito.times(1)).getLock(anyString());
+ verify(distributedLock, Mockito.times(1)).lock(anyObject());
+ verify(distributedLock, Mockito.times(1)).unlock();
+ verify(membership, Mockito.times(1)).registerProcessor();
+ verify(membership, Mockito.times(1)).getNumberOfProcessors();
+ verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+ }
+
+ @Test
+ public void testTwoProcessorsReadRunId() throws Exception {
+ // When there are two processors registered with ClusterMembership
+ // RunIdGenerator should read run id from the MetadataStore
+
+ prepareRunIdGenerator(2);
+
+ String runId = runIdGenerator.getRunId().get();
+
+ assertEquals("Runid was not read from store", runId, FAKE_RUNID);
+
+ verify(coordinationUtils, Mockito.times(1)).getClusterMembership();
+ verify(coordinationUtils, Mockito.times(1)).getLock(anyString());
+ verify(distributedLock, Mockito.times(1)).lock(anyObject());
+ verify(distributedLock, Mockito.times(1)).unlock();
+ verify(membership, Mockito.times(1)).registerProcessor();
+ verify(membership, Mockito.times(1)).getNumberOfProcessors();
+ verify(metadataStore, Mockito.times(1)).get(CoordinationConstants.RUNID_STORE_KEY);
+ }
+
+ private void prepareRunIdGenerator(int numberOfProcessors) throws Exception {
+
+ coordinationUtils = mock(CoordinationUtils.class);
+
+ distributedLock = mock(DistributedLock.class);
+ when(distributedLock.lock(anyObject())).thenReturn(true);
+ when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock);
+
+ membership = mock(ClusterMembership.class);
+ when(membership.getNumberOfProcessors()).thenReturn(numberOfProcessors);
+ when(coordinationUtils.getClusterMembership()).thenReturn(membership);
+
+ metadataStore = mock(MetadataStore.class);
+ when(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY)).thenReturn(FAKE_RUNID.getBytes("UTF-8"));
+
+ runIdGenerator = spy(new RunIdGenerator(coordinationUtils, metadataStore));
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
index 5a9b634..7e2ca08 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
@@ -30,8 +30,10 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
import org.apache.samza.system.StreamSpec;
+import org.apache.samza.zk.ZkMetadataStore;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
@@ -42,7 +44,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -58,7 +59,7 @@ import static org.mockito.Mockito.when;
* TODO: consolidate this with unit tests for ExecutionPlanner after SAMZA-1811
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class})
+@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class, ZkMetadataStoreFactory.class})
public class TestLocalJobPlanner {
private static final String PLAN_JSON =
@@ -73,7 +74,9 @@ public class TestLocalJobPlanner {
@Test
public void testStreamCreation()
throws Exception {
- localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+ StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
+ doReturn(mock(Config.class)).when(appDesc).getConfig();
+ localPlanner = createLocalJobPlanner(appDesc);
StreamManager streamManager = mock(StreamManager.class);
doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
@@ -102,7 +105,9 @@ public class TestLocalJobPlanner {
@Test
public void testStreamCreationWithCoordination()
throws Exception {
- localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+ StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
+ doReturn(mock(Config.class)).when(appDesc).getConfig();
+ localPlanner = createLocalJobPlanner(appDesc);
StreamManager streamManager = mock(StreamManager.class);
doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
@@ -118,9 +123,9 @@ public class TestLocalJobPlanner {
when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
- DistributedLockWithState lock = mock(DistributedLockWithState.class);
- when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
- when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock);
+ DistributedLock lock = mock(DistributedLock.class);
+ when(lock.lock(anyObject())).thenReturn(true);
+ when(coordinationUtils.getLock(anyString())).thenReturn(lock);
when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject()))
.thenReturn(coordinationUtils);
@@ -191,8 +196,17 @@ public class TestLocalJobPlanner {
planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
}
- private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
- return spy(new LocalJobPlanner(appDesc));
+ private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) throws Exception {
+ CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
+ DistributedLock distributedLock = mock(DistributedLock.class);
+ when(distributedLock.lock(anyObject())).thenReturn(true);
+ when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock);
+
+ ZkMetadataStore zkMetadataStore = mock(ZkMetadataStore.class);
+ when(zkMetadataStore.get(any())).thenReturn(null);
+ PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(zkMetadataStore);
+
+ return spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
}
private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 5e91a2a..3ccf587 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -37,42 +37,54 @@ import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ExternalContext;
+import org.apache.samza.coordinator.ClusterMembership;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.DistributedLock;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.task.IdentityStreamTask;
+import org.apache.samza.zk.ZkMetadataStore;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.*;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LocalJobPlanner.class, LocalApplicationRunner.class, ZkMetadataStoreFactory.class})
public class TestLocalApplicationRunner {
private Config config;
private SamzaApplication mockApp;
private LocalApplicationRunner runner;
private LocalJobPlanner localPlanner;
+ private CoordinationUtils coordinationUtils;
+ private ZkMetadataStore metadataStore;
+ private ClusterMembership clusterMembership;
@Before
- public void setUp() {
+ public void setUp() throws Exception {
config = new MapConfig();
mockApp = mock(StreamApplication.class);
prepareTest();
}
@Test
- public void testRunStreamTask() {
+ public void testRunStreamTask() throws Exception {
final Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
cfgs.put(ApplicationConfig.APP_NAME, "test-app");
@@ -105,7 +117,7 @@ public class TestLocalApplicationRunner {
}
@Test
- public void testRunStreamTaskWithoutExternalContext() {
+ public void testRunStreamTaskWithoutExternalContext() throws Exception {
final Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
cfgs.put(ApplicationConfig.APP_NAME, "test-app");
@@ -136,7 +148,7 @@ public class TestLocalApplicationRunner {
}
@Test
- public void testRunComplete() {
+ public void testRunComplete() throws Exception {
Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
config = new MapConfig(cfgs);
@@ -172,7 +184,7 @@ public class TestLocalApplicationRunner {
}
@Test
- public void testRunFailure() {
+ public void testRunFailure() throws Exception {
Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.PROCESSOR_ID, "0");
config = new MapConfig(cfgs);
@@ -247,11 +259,97 @@ public class TestLocalApplicationRunner {
LocalApplicationRunner.createProcessorId(mockConfig);
}
- private void prepareTest() {
+ private void prepareTest() throws Exception {
+ CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
+
+ DistributedLock distributedLock = mock(DistributedLock.class);
+ when(distributedLock.lock(anyObject())).thenReturn(true);
+ when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock);
+
+ ZkMetadataStore zkMetadataStore = mock(ZkMetadataStore.class);
+ when(zkMetadataStore.get(any())).thenReturn(null);
+ PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(zkMetadataStore);
+
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
+ ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);
+ localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
+ runner = spy(new LocalApplicationRunner(appDesc, Optional.of(coordinationUtils)));
+ doReturn(localPlanner).when(runner).getPlanner();
+ }
+
+ /**
+ * For app.mode=BATCH ensure that the run.id generation utils --
+ * DistributedLock, ClusterMembership and MetadataStore are created.
+ * Also ensure that metadataStore.put is invoked (to write the run.id)
+ * @throws Exception
+ */
+ @Test
+ public void testRunIdForBatch() throws Exception {
+ final Map<String, String> cfgs = new HashMap<>();
+ cfgs.put(ApplicationConfig.APP_MODE, "BATCH");
+ cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+ cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
+ cfgs.put(JobConfig.JOB_ID(), "jobId");
+ config = new MapConfig(cfgs);
+ mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
+
+ prepareTestForRunId();
+ runner.run();
+
+ verify(coordinationUtils, Mockito.times(1)).getLock(CoordinationConstants.RUNID_LOCK_ID);
+ verify(clusterMembership, Mockito.times(1)).getNumberOfProcessors();
+ verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+ }
+
+ /**
+ * For app.mode=STREAM ensure that the run.id generation utils --
+ * DistributedLock, ClusterMembership and MetadataStore are NOT created.
+ * Also ensure that metadataStore.put is NOT invoked
+ * @throws Exception
+ */
+ @Test
+ public void testRunIdForStream() throws Exception {
+ final Map<String, String> cfgs = new HashMap<>();
+ cfgs.put(ApplicationConfig.APP_MODE, "STREAM");
+ cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+ cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
+ cfgs.put(JobConfig.JOB_ID(), "jobId");
+ config = new MapConfig(cfgs);
+ mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
+
+ prepareTestForRunId();
+
+ runner.run();
+
+
+ verify(coordinationUtils, Mockito.times(0)).getLock(CoordinationConstants.RUNID_LOCK_ID);
+ verify(coordinationUtils, Mockito.times(0)).getClusterMembership();
+ verify(clusterMembership, Mockito.times(0)).getNumberOfProcessors();
+ verify(metadataStore, Mockito.times(0)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+ }
+
+ private void prepareTestForRunId() throws Exception {
+ coordinationUtils = mock(CoordinationUtils.class);
+
+ DistributedLock lock = mock(DistributedLock.class);
+ when(lock.lock(anyObject())).thenReturn(true);
+ when(coordinationUtils.getLock(anyString())).thenReturn(lock);
+
+ clusterMembership = mock(ClusterMembership.class);
+ when(clusterMembership.getNumberOfProcessors()).thenReturn(1);
+ when(coordinationUtils.getClusterMembership()).thenReturn(clusterMembership);
+
+ metadataStore = mock(ZkMetadataStore.class);
+ when(metadataStore.get(any())).thenReturn(null);
+ PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(metadataStore);
+
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);
- localPlanner = spy(new LocalJobPlanner(appDesc));
- runner = spy(new LocalApplicationRunner(appDesc, localPlanner));
+ runner = spy(new LocalApplicationRunner(appDesc, Optional.of(coordinationUtils)));
+ localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
+ doReturn(localPlanner).when(runner).getPlanner();
+ StreamProcessor sp = mock(StreamProcessor.class);
+ doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), anyObject(), anyObject());
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
new file mode 100644
index 0000000..1775db5
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+
+public class TestZkClusterMembership {
+ private static EmbeddedZookeeper zkServer = null;
+ private static String testZkConnectionString = null;
+ private ZkUtils zkUtils1;
+ private ZkUtils zkUtils2;
+
+ @BeforeClass
+ public static void test() {
+ zkServer = new EmbeddedZookeeper();
+ zkServer.setup();
+ testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort());
+ }
+
+ @Before
+ public void testSetup() {
+ ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+ ZkClient zkClient2 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils2 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient2, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+ }
+
+ @After
+ public void testTearDown() {
+ zkUtils1.close();
+ zkUtils2.close();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ zkServer.teardown();
+ }
+
+ @Test
+ public void testMembershipSingleProcessor() {
+ // happy path for single processor
+ ZkClusterMembership clusterMembership = new ZkClusterMembership("p1", zkUtils1);
+ String processorId;
+
+ assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership.getNumberOfProcessors());
+
+ processorId = clusterMembership.registerProcessor();
+
+ assertEquals("ClusterMembership does not have participants after a processor registered.", 1, clusterMembership.getNumberOfProcessors());
+
+ clusterMembership.unregisterProcessor(processorId);
+
+ assertEquals("ClusterMembership has participants after the single processor unregistered.", 0, clusterMembership.getNumberOfProcessors());
+ }
+
+ @Test
+ public void testMembershipTwoProcessors() {
+ // Two processors register. Check if second processor registering gets 2 as number of processors.
+ ZkClusterMembership clusterMembership1 = new ZkClusterMembership("p1", zkUtils1);
+ ZkClusterMembership clusterMembership2 = new ZkClusterMembership("p2", zkUtils1);
+
+ String processorId1;
+ String processorId2;
+
+ assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership1.getNumberOfProcessors());
+
+ processorId1 = clusterMembership1.registerProcessor();
+
+ assertEquals("ClusterMembership does not have participants after one processor registered.", 1, clusterMembership1.getNumberOfProcessors());
+
+ processorId2 = clusterMembership2.registerProcessor();
+
+ assertEquals("ClusterMembership does not have 2 participants after two processor registered.", 2, clusterMembership2.getNumberOfProcessors());
+
+ clusterMembership1.unregisterProcessor(processorId1);
+ clusterMembership2.unregisterProcessor(processorId2);
+
+ assertEquals("ClusterMembership has participants after both processors unregistered.", 0, clusterMembership1.getNumberOfProcessors());
+ }
+
+ @Test
+ public void testMembershipFirstProcessorUnregister() {
+ // First processor unregisters. Check if second processor registering gets 1 as number of processors.
+ ZkClusterMembership clusterMembership1 = new ZkClusterMembership("p1", zkUtils1);
+ ZkClusterMembership clusterMembership2 = new ZkClusterMembership("p2", zkUtils1);
+
+ String processorId1;
+ String processorId2;
+
+ assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership1.getNumberOfProcessors());
+
+ processorId1 = clusterMembership1.registerProcessor();
+
+ assertEquals("ClusterMembership does not have participants after one processor registered.", 1, clusterMembership1.getNumberOfProcessors());
+
+ clusterMembership1.unregisterProcessor(processorId1);
+
+ processorId2 = clusterMembership2.registerProcessor();
+
+ assertEquals("ClusterMembership does not have 1 participant1 after second processor registered.", 1, clusterMembership2.getNumberOfProcessors());
+
+ clusterMembership2.unregisterProcessor(processorId2);
+
+ assertEquals("ClusterMembership has participants after both processors unregistered.", 0, clusterMembership2.getNumberOfProcessors());
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
new file mode 100644
index 0000000..b5d85aa
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.time.Duration;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+import java.util.List;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+
+public class TestZkDistributedLock {
+ private static EmbeddedZookeeper zkServer = null;
+ private static String testZkConnectionString = null;
+ private ZkUtils zkUtils1;
+ private ZkUtils zkUtils2;
+
+ @BeforeClass
+ public static void test() {
+ zkServer = new EmbeddedZookeeper();
+ zkServer.setup();
+ testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort());
+ }
+
+ @Before
+ public void testSetup() {
+ ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+ ZkClient zkClient2 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils2 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient2, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+ }
+
+ @After
+ public void testTearDown() {
+ zkUtils1.close();
+ zkUtils2.close();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ zkServer.teardown();
+ }
+
+ private List<String> getParticipants(ZkUtils zkUtils, String lockId) {
+ String lockPath = String.format("%s/lock_%s", zkUtils1.getKeyBuilder().getRootPath(), lockId);
+ return zkUtils.getZkClient().getChildren(lockPath);
+ }
+
+ @Test
+ public void testLockSingleProcessor() {
+ String lockId = "FAKE_LOCK_ID_1";
+ ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
+
+
+ assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size());
+
+ boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
+ assertEquals("Lock does not have 1 participant after first processor tries to lock.", 1, getParticipants(zkUtils1, lockId).size());
+ assertEquals("1st processor requesting to lock did not acquire the lock.", true, lock1Status);
+ lock1.unlock();
+ assertEquals("Lock does have 1 participant after first processor tries to unlock.", 0, getParticipants(zkUtils1, lockId).size());
+ }
+
+ @Test
+ public void testLockTwoProcessors() {
+ // second processor should acquire lock after first one unlocks
+ String lockId = "FAKE_LOCK_ID_2";
+ ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
+ ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId);
+
+ assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size());
+
+ boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
+ assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status);
+ lock1.unlock();
+ boolean lock2Status = lock2.lock(Duration.ofMillis(10000));
+ assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status);
+ lock2.unlock();
+ assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils1, lockId).size());
+ }
+
+ @Test
+ public void testLockFirstProcessorClosing() {
+ // first processor dies before unlock then second processor should acquire
+ String lockId = "FAKE_LOCK_ID_3";
+ ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
+ ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId);
+
+
+ assertEquals("Lock has participants before any processor tried to lock!", 0, getParticipants(zkUtils1, lockId).size());
+
+ boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
+ assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status);
+ // first processor dies before unlock
+ zkUtils1.close();
+
+ boolean lock2Status = lock2.lock(Duration.ofMillis(10000));
+ assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status);
+ lock2.unlock();
+ assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils2, lockId).size());
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 3d5f3b3..4d53222 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -57,7 +57,7 @@ public class TestZkMetadataStore {
public void beforeTest() {
String testZkConnectionString = String.format("%s:%s", LOCALHOST, zkServer.getPort());
Config zkConfig = new MapConfig(ImmutableMap.of(ZkConfig.ZK_CONNECT, testZkConnectionString));
- zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("/%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap());
+ zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap());
}
@After
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e7c3ef8..e40bbc3 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -65,6 +65,7 @@ import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.test.harness.IntegrationTestHarness;
@@ -189,40 +190,48 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness {
}
}
- private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId) {
+ private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId, boolean isBatch) {
List<String> inputSystemStreams = inputTopics.stream()
.map(topic -> String.format("%s.%s", TestZkLocalApplicationRunner.TEST_SYSTEM, topic))
.collect(Collectors.toList());
String coordinatorSystemName = "coordinatorSystem";
- Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
- .put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS)
- .put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputSystemStreams))
- .put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM)
- .put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
- .put(ZkConfig.ZK_CONNECT, zkConnect())
- .put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
- .put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
- .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
- .put(ApplicationConfig.APP_NAME, appName)
- .put(ApplicationConfig.APP_ID, appId)
- .put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner")
- .put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY)
- .put(JobConfig.JOB_NAME(), appName)
- .put(JobConfig.JOB_ID(), appId)
- .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
- .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true")
- .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
- .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
- .put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true")
- .put("job.coordinator.system", coordinatorSystemName)
- .put("job.coordinator.replication.factor", "1")
- .build();
+ Map<String, String> config = new HashMap<>();
+ config.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
+ config.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputSystemStreams));
+ config.put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM);
+ config.put(TaskConfig.IGNORED_EXCEPTIONS(), "*");
+ config.put(ZkConfig.ZK_CONNECT, zkConnect());
+ config.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY);
+ config.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY);
+ config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY);
+ config.put(ApplicationConfig.APP_NAME, appName);
+ config.put(ApplicationConfig.APP_ID, appId);
+ config.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+ config.put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY);
+ config.put(JobConfig.JOB_NAME(), appName);
+ config.put(JobConfig.JOB_ID(), appId);
+ config.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+ config.put(TaskConfig.DROP_PRODUCER_ERRORS(), "true");
+ config.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
+ config.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000");
+ config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true");
+ config.put("job.coordinator.system", coordinatorSystemName);
+ config.put("job.coordinator.replication.factor", "1");
+ if (isBatch) {
+ config.put(ApplicationConfig.APP_MODE, "BATCH");
+ config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "false");
+ }
+ Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder().putAll(config).build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(TestZkLocalApplicationRunner.TEST_SYSTEM, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
return applicationConfig;
}
+ private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId) {
+ return buildStreamApplicationConfigMap(inputTopics, appName, appId, false);
+ }
+
/**
* sspGrouper is set to GroupBySystemStreamPartitionFactory.
* Run a stream application(appRunner1) consuming messages from input topic(effectively one container).
@@ -981,6 +990,270 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness {
return taskAssignments;
}
+ /**
+ * Test if two processors coming up at the same time agree on a single runid
+ * 1. bring up two processors
+ * 2. wait till they start consuimg messages
+ * 3. check if first processor run.id matches that of second processor
+ */
+ @Test
+ public void testAgreeingOnSameRunIdForBatch() throws InterruptedException {
+ publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ Map<String, String> configMap =
+ buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+ applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+ applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+
+
+ // Create StreamApplication from configuration.
+ CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+ CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+ ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+ applicationConfig1), applicationConfig1);
+ ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+ applicationConfig2), applicationConfig2);
+
+ executeRun(appRunner1, applicationConfig1);
+ executeRun(appRunner2, applicationConfig2);
+
+ processedMessagesLatch1.await();
+ processedMessagesLatch2.await();
+
+ // At this stage, both the processors are running.
+ // check if their runId matches
+
+ LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+ LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+ assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+ appRunner1.kill();
+ appRunner1.waitForFinish();
+ appRunner2.kill();
+ appRunner2.waitForFinish();
+ }
+
+
+ /**
+ * Test if a new processors joining an existing qurorum get the same runid
+ * 1. bring up two processors
+ * 2. wait till they start consuming messages
+ * 3. bring up a third processor
+ * 4. wait till third processor starts consuming messsages
+ * 5. check if third processor run.id matches that of first twp
+ */
+ @Test
+ public void testNewProcessorGetsSameRunIdForBatch() throws InterruptedException {
+ publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ Map<String, String> configMap =
+ buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+ applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+ applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+ applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
+ // Create StreamApplication from configuration.
+ CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+ CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+ ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+ applicationConfig1), applicationConfig1);
+ ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+ applicationConfig2), applicationConfig2);
+
+ executeRun(appRunner1, applicationConfig1);
+ executeRun(appRunner2, applicationConfig2);
+
+ processedMessagesLatch1.await();
+ processedMessagesLatch2.await();
+
+ // At this stage, both the processors are running.
+ // check if their runId matches
+
+ LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+ LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+ assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+ //Bring up a new processsor
+ CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
+ ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null,
+ applicationConfig3), applicationConfig3);
+ executeRun(appRunner3, applicationConfig3);
+ processedMessagesLatch3.await();
+
+ // At this stage, the new processor is running.
+ // check if new processor's runId matches that of the older processors
+ LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3;
+ assertEquals("RunId of the new processor does not match that of old processor", localApplicationRunner3.getRunId(), localApplicationRunner1.getRunId());
+
+
+ appRunner1.kill();
+ appRunner1.waitForFinish();
+ appRunner2.kill();
+ appRunner2.waitForFinish();
+ appRunner3.kill();
+ appRunner3.waitForFinish();
+ }
+
+
+ /**
+ * Test one group of processors dying and a new processor coming up generates new run.id
+ * 1. bring up two processors
+ * 2. wait till they start consuimg messages
+ * 3. kill and shutdown neatly both the processors
+ * 4. bring up a new processor
+ * 5. wait till new processor starts consuming messages
+ * 6. check if new processor has new runid different from shutdown processors
+ */
+ @Test
+ public void testAllProcesssorDieNewProcessorGetsNewRunIdForBatch() throws InterruptedException {
+ publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ Map<String, String> configMap =
+ buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+ applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+ applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+ applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
+ // Create StreamApplication from configuration.
+ CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+ CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+ ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+ applicationConfig1), applicationConfig1);
+ ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+ applicationConfig2), applicationConfig2);
+
+ executeRun(appRunner1, applicationConfig1);
+ executeRun(appRunner2, applicationConfig2);
+
+ processedMessagesLatch1.await();
+ processedMessagesLatch2.await();
+
+ // At this stage, both the processors are running.
+ // check if their runId matches
+
+ LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+ LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+ assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+ String oldRunId = localApplicationRunner1.getRunId().get();
+
+ // shut down both the processors
+ appRunner1.kill();
+ appRunner1.waitForFinish();
+ appRunner2.kill();
+ appRunner2.waitForFinish();
+
+ //Bring up a new processsor
+ CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
+ ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null,
+ applicationConfig3), applicationConfig3);
+ executeRun(appRunner3, applicationConfig3);
+ processedMessagesLatch3.await();
+
+ // At this stage, the new processor is running.
+ // check if new processor's runId matches that of the older processors
+ LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3;
+
+ assertNotEquals("RunId of the new processor same as that of old stopped processors", oldRunId, localApplicationRunner3.getRunId());
+
+ appRunner3.kill();
+ appRunner3.waitForFinish();
+ }
+
+
+ /**
+ * Test if first processor dying changes the runid for new processors joining
+ * 1. bring up two processors
+ * 2. wait till they start consuimg messages
+ * 3. kill and shutdown first processor
+ * 4. bring up a new processor
+ * 5. wait till new processor starts consuming messages
+ * 6. check if new processor gets same run.id
+ */
+ @Test
+ public void testFirstProcessorDiesButSameRunIdForBatch() throws InterruptedException {
+ publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ Map<String, String> configMap =
+ buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+ applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+ applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+ applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
+ CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+ ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+ applicationConfig1), applicationConfig1);
+ executeRun(appRunner1, applicationConfig1);
+
+ // firt processor is up and running
+ processedMessagesLatch1.await();
+
+ LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+
+ // bring up second processor
+ CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+ ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+ applicationConfig2), applicationConfig2);
+ executeRun(appRunner2, applicationConfig2);
+
+ // second processor is up and running
+ processedMessagesLatch2.await();
+ LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+ assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+ // shut down first processor
+ appRunner1.kill();
+ appRunner1.waitForFinish();
+
+ //Bring up a new processsor
+ CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
+ ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+ TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null,
+ applicationConfig3), applicationConfig3);
+ executeRun(appRunner3, applicationConfig3);
+ processedMessagesLatch3.await();
+
+ // At this stage, the new processor is running.
+ // check if new processor runid matches the old ones
+ LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3;
+ assertEquals("RunId of the new processor is not the same as that of earlier processors", localApplicationRunner2.getRunId(), localApplicationRunner3.getRunId());
+
+
+ appRunner2.kill();
+ appRunner2.waitForFinish();
+ appRunner3.kill();
+ appRunner3.waitForFinish();
+ }
+
+
private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {
System.out.println(jobModel);
Set<SystemStreamPartition> ssps = new HashSet<>();