You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/11/14 20:45:32 UTC
samza git commit: SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes…
Repository: samza
Updated Branches:
refs/heads/master 3750f5e24 -> 6fcf7f3f4
SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes…
… in input topic partitions
Some high-lights of the changes:
- always instantiating StreamPartitionCountMonitor on all input system streams now
-- it is debatable whether we want to include systems that do not implement the optimized ExtendedSystemAdmin interface. We may need to configure a long partition monitor interval for this case and the case where there are tons of input topics. (Pending perf test)
- moved the instantiation of StreamPartitionCountMonitor out of JobModelManager and allow ClusterBasedJobCoordinator associate a callback method directly to the monitor
- allow callbacks to set different application status code before throwing exception to shutdown the job
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Reviewers: Jacob Maes <jm...@linkedin.com>, Jagadish <ja...@apache.org>
Closes #351 from nickpan47/restart-on-partition-change and squashes the following commits:
8d04cd6 [Yi Pan (Data Infrastructure)] SAMZA-1482: restart or fail the job when input topic partition count changes
ee3fa65 [Yi Pan (Data Infrastructure)] SAMZA-1482: Restart or fail Samza jobs in YARN when detecting changes in input topic partitions
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6fcf7f3f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6fcf7f3f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6fcf7f3f
Branch: refs/heads/master
Commit: 6fcf7f3f4cbf8b9c6f69b292e3c1aaa239ab18d3
Parents: 3750f5e
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Nov 14 12:45:23 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 14 12:45:23 2017 -0800
----------------------------------------------------------------------
.../apache/samza/PartitionChangeException.java | 31 ++++
.../ClusterBasedJobCoordinator.java | 108 +++++++++--
.../clustermanager/ContainerProcessManager.java | 26 ++-
.../clustermanager/SamzaApplicationState.java | 7 +-
.../StreamPartitionCountMonitor.java | 116 +++++++-----
.../org/apache/samza/config/JobConfig.scala | 2 -
.../org/apache/samza/config/StorageConfig.scala | 8 +
.../samza/coordinator/JobModelManager.scala | 49 ++---
.../main/scala/org/apache/samza/util/Util.scala | 1 -
.../MockClusterResourceManagerFactory.java | 32 ++++
.../clustermanager/MockContainerListener.java | 1 +
.../TestClusterBasedJobCoordinator.java | 108 +++++++++++
.../clustermanager/TestContainerAllocator.java | 12 +-
.../TestContainerProcessManager.java | 22 +--
.../TestHostAwareContainerAllocator.java | 16 +-
.../coordinator/JobModelManagerTestUtil.java | 4 +-
.../samza/coordinator/TestJobModelManager.java | 24 +--
.../samza/storage/MockSystemConsumer.java | 59 ------
.../apache/samza/storage/MockSystemFactory.java | 45 -----
.../samza/storage/TestStorageRecovery.java | 37 +---
.../apache/samza/system/MockSystemFactory.java | 181 +++++++++++++++++++
.../samza/coordinator/TestJobCoordinator.scala | 65 +------
.../TestStreamPartitionCountMonitor.scala | 82 ++++++++-
.../TestRangeSystemStreamPartitionMatcher.scala | 1 -
.../TestRegexSystemStreamPartitionMatcher.scala | 1 -
25 files changed, 693 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
new file mode 100644
index 0000000..4619dfa
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/PartitionChangeException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+
+/**
+ * Exception to indicate that the input {@link org.apache.samza.system.SystemStreamPartition} changed
+ */
+public class PartitionChangeException extends SamzaException {
+
+ public PartitionChangeException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index d0d4e34..3d67cae 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -18,21 +18,36 @@
*/
package org.apache.samza.clustermanager;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
import org.apache.samza.SamzaException;
+import org.apache.samza.PartitionChangeException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.metrics.JmxServer;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* Implements a JobCoordinator that is completely independent of the underlying cluster
* manager system. This {@link ClusterBasedJobCoordinator} handles functionality common
@@ -69,10 +84,6 @@ public class ClusterBasedJobCoordinator {
*/
private final SamzaApplicationState state;
- /**
- * Metrics to track stats around container failures, needed containers etc.
- */
-
//even though some of these can be converted to local variables, it will not be the case
//as we add more methods to the JobCoordinator and completely implement SAMZA-881.
@@ -101,8 +112,30 @@ public class ClusterBasedJobCoordinator {
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ /**
+ * A boolean variable indicating whether the job has durable state stores in the configuration
+ */
+ private final boolean hasDurableStores;
+
+ /**
+ * The input topic partition count monitor
+ */
+ private final StreamPartitionCountMonitor partitionMonitor;
+
+ /**
+ * Metrics to track stats around container failures, needed containers etc.
+ */
+ private final MetricsRegistryMap metrics;
+
+ /**
+ * Internal variable for the instance of {@link JmxServer}
+ */
private JmxServer jmxServer;
+ /**
+ * Variable to keep the callback exception
+ */
+ volatile private Exception coordinatorException = null;
/**
* Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually
@@ -113,22 +146,23 @@ public class ClusterBasedJobCoordinator {
*/
public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
- MetricsRegistryMap registry = new MetricsRegistryMap();
+ metrics = new MetricsRegistryMap();
//build a JobModelReader and perform partition assignments.
- jobModelManager = buildJobModelManager(coordinatorSystemConfig, registry);
+ jobModelManager = buildJobModelManager(coordinatorSystemConfig, metrics);
config = jobModelManager.jobModel().getConfig();
+ hasDurableStores = new StorageConfig(config).hasDurableStores();
state = new SamzaApplicationState(jobModelManager);
+ partitionMonitor = getPartitionCountMonitor(config);
clusterManagerConfig = new ClusterManagerConfig(config);
isJmxEnabled = clusterManagerConfig.getJmxEnabled();
jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval();
// build a container process Manager
- containerProcessManager = new ContainerProcessManager(config, state, registry);
+ containerProcessManager = new ContainerProcessManager(config, state, metrics);
}
-
/**
* Starts the JobCoordinator.
*
@@ -152,10 +186,11 @@ public class ClusterBasedJobCoordinator {
log.info("Starting Cluster Based Job Coordinator");
containerProcessManager.start();
+ partitionMonitor.start();
boolean isInterrupted = false;
- while (!containerProcessManager.shouldShutdown() && !isInterrupted) {
+ while (!containerProcessManager.shouldShutdown() && !checkAndThrowException() && !isInterrupted) {
try {
Thread.sleep(jobCoordinatorSleepInterval);
} catch (InterruptedException e) {
@@ -172,19 +207,25 @@ public class ClusterBasedJobCoordinator {
}
}
+ private boolean checkAndThrowException() throws Exception {
+ if (coordinatorException != null) {
+ throw coordinatorException;
+ }
+ return false;
+ }
+
/**
* Stops all components of the JobCoordinator.
*/
private void onShutDown() {
- if (containerProcessManager != null) {
- try {
- containerProcessManager.stop();
- } catch (Throwable e) {
- log.error("Exception while stopping task manager {}", e);
- }
- log.info("Stopped task manager");
+ try {
+ partitionMonitor.stop();
+ containerProcessManager.stop();
+ } catch (Throwable e) {
+ log.error("Exception while stopping task manager {}", e);
}
+ log.info("Stopped task manager");
if (jmxServer != null) {
try {
@@ -201,6 +242,41 @@ public class ClusterBasedJobCoordinator {
return jobModelManager;
}
+ private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) {
+ Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins();
+ StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance());
+ Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
+ if (inputStreamsToMonitor.isEmpty()) {
+ throw new SamzaException("Input streams to a job can not be empty.");
+ }
+
+ return new StreamPartitionCountMonitor(
+ inputStreamsToMonitor,
+ streamMetadata,
+ metrics,
+ new JobConfig(config).getMonitorPartitionChangeFrequency(),
+ streamsChanged -> {
+ // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
+ if (hasDurableStores) {
+ log.error("Input topic partition count changed in a job with durable state. Failing the job.");
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ }
+ coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
+ });
+ }
+
+ // The following two methods are package-private and for testing only
+ @VisibleForTesting
+ SamzaApplicationState.SamzaAppStatus getAppStatus() {
+ // make sure to only return a unmodifiable copy of the status variable
+ final SamzaApplicationState.SamzaAppStatus copy = state.status;
+ return copy;
+ }
+
+ @VisibleForTesting
+ StreamPartitionCountMonitor getPartitionMonitor() {
+ return partitionMonitor;
+ }
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 2861e9e..6a18b84 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -33,6 +33,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
/**
* ContainerProcessManager is responsible for requesting containers, handling failures, and notifying the application master that the
* job is done.
@@ -85,9 +88,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
*/
private volatile boolean tooManyFailedContainers = false;
+ /**
+ * Exception thrown in callbacks, such as {@code containerAllocator}
+ */
private volatile Throwable exceptionOccurred = null;
-
/**
* A map that keeps track of how many times each container failed. The key is the container ID, and the
* value is the {@link ResourceFailure} object that has a count of failures.
@@ -95,9 +100,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
*/
private final Map<String, ResourceFailure> containerFailures = new HashMap<>();
+ /**
+ * Metrics for {@link ContainerProcessManager}
+ */
private final ContainerProcessManagerMetrics metrics;
-
public ContainerProcessManager(Config config,
SamzaApplicationState state,
MetricsRegistryMap registry) {
@@ -108,7 +115,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
- this.clusterResourceManager = factory.getClusterResourceManager(this, state);
+ this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
if (this.hostAffinityEnabled) {
@@ -189,6 +196,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
containerAllocator.stop();
try {
allocatorThread.join();
+ log.info("Stopped container allocator");
} catch (InterruptedException ie) {
log.error("Allocator Thread join() threw an interrupted exception", ie);
Thread.currentThread().interrupt();
@@ -197,19 +205,17 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
if (metrics != null) {
try {
metrics.stop();
+ log.info("Stopped metrics reporters");
} catch (Throwable e) {
log.error("Exception while stopping metrics {}", e);
}
- log.info("Stopped metrics reporters");
}
- if (clusterResourceManager != null) {
- try {
- clusterResourceManager.stop(state.status);
- } catch (Throwable e) {
- log.error("Exception while stopping cluster resource manager {}", e);
- }
+ try {
+ clusterResourceManager.stop(state.status);
log.info("Stopped cluster resource manager");
+ } catch (Throwable e) {
+ log.error("Exception while stopping cluster resource manager {}", e);
}
log.info("Finished stop of Container process manager");
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 653fb4e..adc6e51 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -42,6 +42,9 @@ public class SamzaApplicationState {
public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED }
+ /**
+ * {@link JobModelManager} object associated with this {@link SamzaApplicationState}
+ */
public final JobModelManager jobModelManager;
/**
@@ -102,9 +105,9 @@ public class SamzaApplicationState {
public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
/**
- * Final status of the application
+ * Final status of the application. Made to be volatile s.t. changes will be visible in multiple threads.
*/
- public SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
+ public volatile SamzaAppStatus status = SamzaAppStatus.UNDEFINED;
/**
* State indicating whether the job is healthy or not
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
index b35cbff..16e8221 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -18,8 +18,10 @@
*/
package org.apache.samza.coordinator;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -28,7 +30,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
@@ -48,10 +50,10 @@ public class StreamPartitionCountMonitor {
private final Set<SystemStream> streamsToMonitor;
private final StreamMetadataCache metadataCache;
- private final MetricsRegistryMap metrics;
private final int monitorPeriodMs;
private final Map<SystemStream, Gauge<Integer>> gauges;
private final Map<SystemStream, SystemStreamMetadata> initialMetadata;
+ private final Callback callbackMethod;
// Used to guard write access to state.
private final Object lock = new Object();
@@ -61,6 +63,19 @@ public class StreamPartitionCountMonitor {
private volatile State state = State.INIT;
+ /**
+ * A callback that is invoked when the {@link StreamPartitionCountMonitor} detects a change in the partition count of
+ * any of its {@link SystemStream}s.
+ */
+ public interface Callback {
+ /**
+ * Method to be called when SSP changes detected in the input
+ *
+ * @param streamsChanged the set of {@link SystemStream}s that have partition count changes
+ */
+ void onSystemStreamPartitionChange(Set<SystemStream> streamsChanged);
+ }
+
/**
* Gets the metadata for all the specified system streams from the provided metadata cache.
@@ -88,14 +103,15 @@ public class StreamPartitionCountMonitor {
* @param metadataCache the metadata cache which will be used to fetch metadata for partition counts.
* @param metrics the metrics registry to which the metrics should be added.
* @param monitorPeriodMs the period at which the monitor will run in milliseconds.
+ * @param monitorCallback the callback method to be invoked when partition count changes are detected
*/
public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache,
- MetricsRegistryMap metrics, int monitorPeriodMs) {
+ MetricsRegistry metrics, int monitorPeriodMs, Callback monitorCallback) {
this.streamsToMonitor = streamsToMonitor;
this.metadataCache = metadataCache;
- this.metrics = metrics;
this.monitorPeriodMs = monitorPeriodMs;
this.initialMetadata = getMetadata(streamsToMonitor, metadataCache);
+ this.callbackMethod = monitorCallback;
// Pre-populate the gauges
Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>();
@@ -109,48 +125,20 @@ public class StreamPartitionCountMonitor {
}
/**
- * Fetches the current partition count for each system stream from the cache, compares the current count to the
- * original count and updates the metric for that system stream with the delta.
- */
- void updatePartitionCountMetric() {
- try {
- Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache);
-
- for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
- SystemStream systemStream = metadataEntry.getKey();
- SystemStreamMetadata metadata = metadataEntry.getValue();
-
- int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().keySet().size();
- int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().keySet().size();
-
- Gauge gauge = gauges.get(systemStream);
- gauge.set(currentPartitionCount - prevPartitionCount);
- }
- } catch (Exception e) {
- log.error("Exception while updating partition count metric.", e);
- }
- }
-
- /**
- * For testing. Returns the metrics.
- */
- Map<SystemStream, Gauge<Integer>> getGauges() {
- return gauges;
- }
-
- /**
* Starts the monitor.
*/
public void start() {
synchronized (lock) {
switch (state) {
case INIT:
- schedulerService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- updatePartitionCountMetric();
- }
- }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
+ if (monitorPeriodMs > 0) {
+ schedulerService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ updatePartitionCountMetric();
+ }
+ }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
+ }
state = State.RUNNING;
break;
@@ -179,8 +167,53 @@ public class StreamPartitionCountMonitor {
}
/**
- * For testing.
+ * Fetches the current partition count for each system stream from the cache, compares the current count to the
+ * original count and updates the metric for that system stream with the delta.
+ */
+ @VisibleForTesting
+ public void updatePartitionCountMetric() {
+ try {
+ Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache);
+ Set<SystemStream> streamsChanged = new HashSet<>();
+
+ for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
+ try {
+ SystemStream systemStream = metadataEntry.getKey();
+ SystemStreamMetadata metadata = metadataEntry.getValue();
+
+ int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().size();
+ int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().size();
+
+ Gauge gauge = gauges.get(systemStream);
+ gauge.set(currentPartitionCount - prevPartitionCount);
+ if (currentPartitionCount != prevPartitionCount) {
+ log.warn(String.format("Change of partition count detected in stream %s. old partition count: %d, current partition count: %d",
+ systemStream.toString(), prevPartitionCount, currentPartitionCount));
+ streamsChanged.add(systemStream);
+ }
+ } catch (Exception e) {
+ log.error(String.format("Error comparing partition count differences for stream: %s", metadataEntry.getKey().toString()));
+ }
+ }
+
+ if (!streamsChanged.isEmpty() && this.callbackMethod != null) {
+ this.callbackMethod.onSystemStreamPartitionChange(streamsChanged);
+ }
+
+ } catch (Exception e) {
+ log.error("Exception while updating partition count metric.", e);
+ }
+ }
+
+ /**
+ * For testing. Returns the metrics.
*/
+ @VisibleForTesting
+ Map<SystemStream, Gauge<Integer>> getGauges() {
+ return gauges;
+ }
+
+ @VisibleForTesting
boolean isRunning() {
return state == State.RUNNING;
}
@@ -191,6 +224,7 @@ public class StreamPartitionCountMonitor {
* <p>
* This is currently exposed at the package private level for tests only.
*/
+ @VisibleForTesting
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return schedulerService.awaitTermination(timeout, unit);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 1b3b893..083dbaf 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -137,8 +137,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
}
}
- def getMonitorPartitionChange = getBoolean(JobConfig.MONITOR_PARTITION_CHANGE, false)
-
def getMonitorPartitionChangeFrequency = getInt(
JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS)
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index 0e3d568..ad03e59 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -98,4 +98,12 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
.map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem)
.contains(systemName)
}
+
+ /**
+ * Helper method to check if there is any stores configured w/ a changelog
+ */
+ def hasDurableStores : Boolean = {
+ val conf = config.subset("stores.", true)
+ conf.asScala.keys.exists(k => k.endsWith(".changelog"))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 92c3663..e915a8a 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -43,17 +43,10 @@ import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.ExtendedSystemAdmin
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamPartitionMatcher
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.StreamSpec
+import org.apache.samza.system._
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
+import org.apache.samza.{Partition, PartitionChangeException, SamzaException}
import scala.collection.JavaConverters._
@@ -110,22 +103,8 @@ object JobModelManager extends Logging {
val systemAdmins = getSystemAdmins(config)
val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
- var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
- if (config.getMonitorPartitionChange) {
- val extendedSystemAdmins = systemAdmins.filter{
- case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
- }
- val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.contains(systemStream.getSystem))
- if (inputStreamsToMonitor.nonEmpty) {
- streamPartitionCountMonitor = new StreamPartitionCountMonitor(
- inputStreamsToMonitor.asJava,
- streamMetadataCache,
- metricsRegistryMap,
- config.getMonitorPartitionChangeFrequency)
- }
- }
val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
- val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, streamPartitionCountMonitor, null)
+ val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, null)
val jobModel = jobModelManager.jobModel
// Save the changelog mapping back to the ChangelogPartitionmanager
// newChangelogPartitionMapping is the merging of all current task:changelog
@@ -145,6 +124,13 @@ object JobModelManager extends Logging {
jobModelManager
}
+
+ /**
+ * This method creates a {@link JobModelManager} object w/o {@link StreamPartitionCountMonitor}
+ *
+ * @param coordinatorSystemConfig configuration for coordinator system
+ * @return a JobModelManager object
+ */
def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap())
/**
@@ -154,14 +140,13 @@ object JobModelManager extends Logging {
changeLogMapping: util.Map[TaskName, Integer],
localityManager: LocalityManager,
streamMetadataCache: StreamMetadataCache,
- streamPartitionCountMonitor: StreamPartitionCountMonitor,
containerIds: java.util.List[String]) = {
val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds)
jobModelRef.set(jobModel)
val server = new HttpServer
server.addServlet("/", new JobServlet(jobModelRef))
- currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor)
+ currentJobModelManager = new JobModelManager(jobModel, server)
currentJobModelManager
}
@@ -337,7 +322,6 @@ object JobModelManager extends Logging {
}
private def getSystemNames(config: Config) = config.getSystemNames.toSet
-
}
/**
@@ -361,8 +345,7 @@ class JobModelManager(
/**
* HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
*/
- val server: HttpServer = null,
- val streamPartitionCountMonitor: StreamPartitionCountMonitor = null) extends Logging {
+ val server: HttpServer = null) extends Logging {
debug("Got job model: %s." format jobModel)
@@ -370,10 +353,6 @@ class JobModelManager(
if (server != null) {
debug("Starting HTTP server.")
server.start
- if (streamPartitionCountMonitor != null) {
- debug("Starting Stream Partition Count Monitor..")
- streamPartitionCountMonitor.start()
- }
info("Started HTTP server: %s" format server.getUrl)
}
}
@@ -381,10 +360,6 @@ class JobModelManager(
def stop {
if (server != null) {
debug("Stopping HTTP server.")
- if (streamPartitionCountMonitor != null) {
- debug("Stopping Stream Partition Count Monitor..")
- streamPartitionCountMonitor.stop()
- }
server.stop
info("Stopped HTTP server.")
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 46bdb75..cc2a097 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -218,7 +218,6 @@ object Util extends Logging {
JobConfig.JOB_NAME -> jobName,
JobConfig.JOB_ID -> jobId,
JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
- JobConfig.MONITOR_PARTITION_CHANGE -> String.valueOf(config.getMonitorPartitionChange),
JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency))
new MapConfig(map.asJava)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
new file mode 100644
index 0000000..3a464c2
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+/**
+ * Mock {@link ResourceManagerFactory} used in unit tests
+ */
+public class MockClusterResourceManagerFactory implements ResourceManagerFactory {
+
+ @Override
+ public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback,
+ SamzaApplicationState state) {
+ return new MockClusterResourceManager(callback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
index db70c38..3987288 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
new file mode 100644
index 0000000..264966d
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.MockSystemFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Tests for {@link ClusterBasedJobCoordinator}
+ */
+public class TestClusterBasedJobCoordinator {
+
+ Map<String, String> configMap;
+
+ @Before
+ public void setUp() throws NoSuchFieldException, NoSuchMethodException {
+ configMap = new HashMap<>();
+ configMap.put("job.name", "test-job");
+ configMap.put("job.coordinator.system", "kafka");
+ configMap.put("task.inputs", "kafka.topic1");
+ configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.MockSystemFactory");
+ configMap.put("samza.cluster-manager.factory", "org.apache.samza.clustermanager.MockClusterResourceManagerFactory");
+ configMap.put("job.coordinator.monitor-partition-change.frequency.ms", "1");
+
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(0)), new ArrayList<>());
+ }
+
+ @After
+ public void tearDown() {
+ MockSystemFactory.MSG_QUEUES.clear();
+ }
+
+ @Test
+ public void testPartitionCountMonitorWithDurableStates()
+ throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ configMap.put("stores.mystore.changelog", "mychangelog");
+ Config config = new MapConfig(configMap);
+
+ // mimic job runner code to write the config to coordinator stream
+ CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
+ CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
+ producer.writeConfig("test-job", config);
+
+ ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
+
+ // change the input system stream metadata
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
+
+ StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
+ monitor.updatePartitionCountMetric();
+ assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.FAILED);
+ }
+
+ @Test
+ public void testPartitionCountMonitorWithoutDurableStates() throws IllegalAccessException, InvocationTargetException {
+ Config config = new MapConfig(configMap);
+
+ // mimic job runner code to write the config to coordinator stream
+ CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
+ CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
+ producer.writeConfig("test-job", config);
+
+ ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
+
+ // change the input system stream metadata
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
+
+ StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
+ monitor.updatePartitionCountMetric();
+ assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.UNDEFINED);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 1e9d372..734043a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -73,16 +73,16 @@ public class TestContainerAllocator {
private static Config getConfig() {
Config config = new MapConfig(new HashMap<String, String>() {
{
- put("yarn.container.count", "1");
- put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
- put("yarn.container.memory.mb", "512");
+ put("cluster-manager.container.count", "1");
+ put("cluster-manager.container.retry.count", "1");
+ put("cluster-manager.container.retry.window.ms", "1999999999");
+ put("cluster-manager.allocator.sleep.ms", "10");
+ put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
+ put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
- put("yarn.container.retry.count", "1");
- put("yarn.container.retry.window.ms", "1999999999");
- put("yarn.allocator.sleep.ms", "10");
}
});
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 4288aea..e252b7d 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -54,17 +54,17 @@ public class TestContainerProcessManager {
private Map<String, String> configVals = new HashMap<String, String>() {
{
- put("yarn.container.count", "1");
- put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
- put("yarn.container.memory.mb", "512");
+ put("cluster-manager.container.count", "1");
+ put("cluster-manager.container.retry.count", "1");
+ put("cluster-manager.container.retry.window.ms", "1999999999");
+ put("cluster-manager.allocator.sleep.ms", "1");
+ put("cluster-manager.container.request.timeout.ms", "2");
+ put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
+ put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
- put("yarn.container.retry.count", "1");
- put("yarn.container.retry.window.ms", "1999999999");
- put("yarn.allocator.sleep.ms", "1");
- put("yarn.container.request.timeout.ms", "2");
}
};
private Config config = new MapConfig(configVals);
@@ -117,8 +117,8 @@ public class TestContainerProcessManager {
public void testContainerProcessManager() throws Exception {
Map<String, String> conf = new HashMap<>();
conf.putAll(getConfig());
- conf.put("yarn.container.memory.mb", "500");
- conf.put("yarn.container.cpu.cores", "5");
+ conf.put("cluster-manager.container.memory.mb", "500");
+ conf.put("cluster-manager.container.cpu.cores", "5");
state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
ContainerProcessManager taskManager = new ContainerProcessManager(
@@ -137,8 +137,8 @@ public class TestContainerProcessManager {
conf.clear();
conf.putAll(getConfigWithHostAffinity());
- conf.put("yarn.container.memory.mb", "500");
- conf.put("yarn.container.cpu.cores", "5");
+ conf.put("cluster-manager.container.memory.mb", "500");
+ conf.put("cluster-manager.container.cpu.cores", "5");
state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1));
taskManager = new ContainerProcessManager(
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 32ec2d2..00198e9 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -328,18 +328,18 @@ public class TestHostAwareContainerAllocator {
private static Config getConfig() {
Config config = new MapConfig(new HashMap<String, String>() {
{
- put("yarn.container.count", "1");
- put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
- put("yarn.container.memory.mb", "512");
+ put("cluster-manager.container.count", "1");
+ put("cluster-manager.container.retry.count", "1");
+ put("cluster-manager.container.retry.window.ms", "1999999999");
+ put("cluster-manager.container.request.timeout.ms", "3");
+ put("cluster-manager.allocator.sleep.ms", "1");
+ put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
+ put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
- put("yarn.container.retry.count", "1");
- put("yarn.container.retry.window.ms", "1999999999");
- put("yarn.samza.host-affinity.enabled", "true");
- put("yarn.container.request.timeout.ms", "3");
- put("yarn.allocator.sleep.ms", "1");
+ put("job.host-affinity.enabled", "true");
}
});
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
index b7514c4..6a69889 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
@@ -49,7 +49,7 @@ public class JobModelManagerTestUtil {
containers.put(String.valueOf(i), container);
}
JobModel jobModel = new JobModel(config, containers, localityManager);
- return new JobModelManager(jobModel, server, null);
+ return new JobModelManager(jobModel, server);
}
public static JobModelManager getJobModelManagerUsingReadModel(Config config, int containerCount, StreamMetadataCache streamMetadataCache,
@@ -59,7 +59,7 @@ public class JobModelManagerTestUtil {
containerIds.add(String.valueOf(i));
}
JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), locManager, streamMetadataCache, containerIds);
- return new JobModelManager(jobModel, server, null);
+ return new JobModelManager(jobModel, server);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 1d6fc65..3130ed6 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -85,16 +85,16 @@ public class TestJobModelManager {
public void testLocalityMapWithHostAffinity() {
Config config = new MapConfig(new HashMap<String, String>() {
{
- put("yarn.container.count", "1");
- put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
- put("yarn.container.memory.mb", "512");
+ put("cluster-manager.container.count", "1");
+ put("cluster-manager.container.memory.mb", "512");
+ put("cluster-manager.container.retry.count", "1");
+ put("cluster-manager.container.retry.window.ms", "1999999999");
+ put("cluster-manager.allocator.sleep.ms", "10");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
+ put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
- put("yarn.container.retry.count", "1");
- put("yarn.container.retry.window.ms", "1999999999");
- put("yarn.allocator.sleep.ms", "10");
put("job.host-affinity.enabled", "true");
}
});
@@ -111,16 +111,16 @@ public class TestJobModelManager {
public void testLocalityMapWithoutHostAffinity() {
Config config = new MapConfig(new HashMap<String, String>() {
{
- put("yarn.container.count", "1");
- put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
- put("yarn.container.memory.mb", "512");
+ put("cluster-manager.container.count", "1");
+ put("cluster-manager.container.memory.mb", "512");
+ put("cluster-manager.container.retry.count", "1");
+ put("cluster-manager.container.retry.window.ms", "1999999999");
+ put("cluster-manager.allocator.sleep.ms", "10");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
+ put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
- put("yarn.container.retry.count", "1");
- put("yarn.container.retry.window.ms", "1999999999");
- put("yarn.allocator.sleep.ms", "10");
put("job.host-affinity.enabled", "false");
}
});
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
deleted file mode 100644
index 07c4a24..0000000
--- a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemStreamPartition;
-
-public class MockSystemConsumer implements SystemConsumer {
- public static Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
- private boolean flag = true; // flag to make sure the messages only are
- // returned once
-
- @Override
- public void start() {}
-
- @Override
- public void stop() {}
-
- @Override
- public void register(SystemStreamPartition systemStreamPartition, String offset) {}
-
- @Override
- public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
- if (flag) {
- ArrayList<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
- list.add(TestStorageRecovery.msg);
- messages.put(TestStorageRecovery.ssp, list);
- flag = false;
- return messages;
- } else {
- messages.clear();
- return messages;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
deleted file mode 100644
index 7abf82b..0000000
--- a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
-
-public class MockSystemFactory implements SystemFactory {
-
- @Override
- public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
- return new MockSystemConsumer();
- }
-
- @Override
- public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
- return null;
- }
-
- @Override
- public SystemAdmin getAdmin(String systemName, Config config) {
- return TestStorageRecovery.systemAdmin;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
index 21d0150..7c1647e 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -19,34 +19,25 @@
package org.apache.samza.storage;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.MockSystemFactory;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class TestStorageRecovery {
- public static SystemAdmin systemAdmin = null;
public Config config = null;
- public SystemStreamMetadata systemStreamMetadata = null;
- public SystemStreamMetadata inputSystemStreamMetadata = null;
private static final String SYSTEM_STREAM_NAME = "changelog";
private static final String INPUT_STREAM = "input";
private static final String STORE_NAME = "testStore";
@@ -57,16 +48,6 @@ public class TestStorageRecovery {
public void setup() throws InterruptedException {
putConfig();
putMetadata();
-
- systemAdmin = mock(SystemAdmin.class);
-
- Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME));
- Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM));
- HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<>();
- ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata);
- ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata);
- when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap);
- when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap);
}
@After
@@ -106,15 +87,9 @@ public class TestStorageRecovery {
}
private void putMetadata() {
- SystemStreamMetadata.SystemStreamPartitionMetadata sspm = new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2");
- HashMap<Partition, SystemStreamPartitionMetadata> map = new HashMap<Partition, SystemStreamPartitionMetadata>();
- map.put(new Partition(0), sspm);
- map.put(new Partition(1), sspm);
- systemStreamMetadata = new SystemStreamMetadata(SYSTEM_STREAM_NAME, map);
-
- HashMap<Partition, SystemStreamPartitionMetadata> map1 = new HashMap<Partition, SystemStreamPartitionMetadata>();
- map1.put(new Partition(0), sspm);
- map1.put(new Partition(1), sspm);
- inputSystemStreamMetadata = new SystemStreamMetadata(INPUT_STREAM, map1);
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(0)), new ArrayList<IncomingMessageEnvelope>() { { this.add(msg); } });
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(1)), new ArrayList<IncomingMessageEnvelope>() { { this.add(msg); } });
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", INPUT_STREAM, new Partition(0)), new ArrayList<>());
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("mockSystem", INPUT_STREAM, new Partition(1)), new ArrayList<>());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
new file mode 100644
index 0000000..a9c57da
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * A mock system backed by a set of in-memory queues. Used for testing w/o actual external messaging systems.
+ */
+public class MockSystemFactory implements SystemFactory {
+
+ public static final Map<SystemStreamPartition, List<IncomingMessageEnvelope>> MSG_QUEUES = new ConcurrentHashMap<>();
+
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ return new SystemConsumer() {
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ MSG_QUEUES.putIfAbsent(systemStreamPartition, new ArrayList<>());
+ }
+
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> retQueues = new HashMap<>();
+ systemStreamPartitions.forEach(ssp -> {
+ List<IncomingMessageEnvelope> msgs = MSG_QUEUES.get(ssp);
+ if (msgs == null) {
+ retQueues.put(ssp, new ArrayList<>());
+ } else {
+ retQueues.put(ssp, MSG_QUEUES.remove(ssp));
+ }
+ });
+ return retQueues;
+ }
+ };
+ }
+
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ return new SystemProducer() {
+ private final Random seed = new Random(System.currentTimeMillis());
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void register(String source) {
+ }
+
+ @Override
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ SystemStream systemStream = envelope.getSystemStream();
+ List<SystemStreamPartition> sspForSystem = MSG_QUEUES.keySet().stream()
+ .filter(ssp -> ssp.getSystemStream().equals(systemStream))
+ .collect(ArrayList::new, (l, ssp) -> l.add(ssp), (l1, l2) -> l1.addAll(l2));
+ if (sspForSystem.isEmpty()) {
+ MSG_QUEUES.putIfAbsent(new SystemStreamPartition(systemStream, new Partition(0)), new ArrayList<>());
+ sspForSystem.add(new SystemStreamPartition(systemStream, new Partition(0)));
+ }
+ int partitionCount = sspForSystem.size();
+ int partitionId = envelope.getPartitionKey() == null ?
+ envelope.getKey() == null ? this.seed.nextInt(partitionCount) : envelope.getKey().hashCode() % partitionCount :
+ envelope.getPartitionKey().hashCode() % partitionCount;
+ SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(partitionId));
+ List<IncomingMessageEnvelope> msgQueue = MSG_QUEUES.get(ssp);
+ msgQueue.add(new IncomingMessageEnvelope(ssp, null, envelope.getKey(), envelope.getMessage()));
+ }
+
+ @Override
+ public void flush(String source) {
+
+ }
+ };
+ }
+
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new ExtendedSystemAdmin() {
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ return null;
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ Map<String, SystemStreamMetadata> metadataMap = new HashMap<>();
+ Map<String, Set<Partition>> partitionMap = MSG_QUEUES.entrySet()
+ .stream()
+ .filter(entry -> streamNames.contains(entry.getKey().getSystemStream().getStream()))
+ .map(e -> e.getKey()).<Map<String, Set<Partition>>>collect(HashMap::new, (m, ssp) -> {
+ if (m.get(ssp.getStream()) == null) {
+ m.put(ssp.getStream(), new HashSet<>());
+ }
+ m.get(ssp.getStream()).add(ssp.getPartition());
+ }, (m1, m2) -> {
+ m2.forEach((k, v) -> {
+ if (m1.get(k) == null) {
+ m1.put(k, v);
+ } else {
+ m1.get(k).addAll(v);
+ }
+ });
+ });
+
+ partitionMap.forEach((k, v) -> {
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetaMap =
+ v.stream().<Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>>collect(HashMap::new,
+ (m, p) -> {
+ m.put(p, new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", ""));
+ }, (m1, m2) -> m1.putAll(m2));
+
+ metadataMap.put(k, new SystemStreamMetadata(k, partitionMetaMap));
+ });
+
+ return metadataMap;
+ }
+
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ return null;
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
+ return getSystemStreamMetadata(streamNames);
+ }
+
+ @Override
+ public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) {
+ return null;
+ }
+
+ @Override
+ public boolean createStream(StreamSpec streamSpec) {
+ return true;
+ }
+
+ @Override
+ public void validateStream(StreamSpec streamSpec) throws StreamValidationException {
+
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index e6b148b..f092d75 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -26,8 +26,7 @@ import org.apache.samza.job.local.ProcessJobFactory
import org.apache.samza.job.local.ThreadJobFactory
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.util.Util
-import org.junit.After
-import org.junit.Test
+import org.junit.{After, Before, Test}
import org.junit.Assert._
import scala.collection.JavaConverters._
@@ -36,10 +35,7 @@ import org.apache.samza.config.TaskConfig
import org.apache.samza.config.SystemConfig
import org.apache.samza.container.SamzaContainer
import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.config.Config
import org.apache.samza.system._
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.job.model.JobModel
@@ -126,12 +122,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
- // Check the status of Stream Partition Count Monitor
- assertNotNull(coordinator.streamPartitionCountMonitor)
- assertTrue(coordinator.streamPartitionCountMonitor.isRunning())
-
coordinator.stop
- assertFalse(coordinator.streamPartitionCountMonitor.isRunning())
}
@Test
@@ -288,54 +279,16 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
}}.toMap
}
+ @Before
+ def setUp() {
+ // setup the test stream metadata
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(0)), new util.ArrayList[IncomingMessageEnvelope]());
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(1)), new util.ArrayList[IncomingMessageEnvelope]());
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", "stream1", new Partition(2)), new util.ArrayList[IncomingMessageEnvelope]());
+ }
@After
def tearDown() = {
MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
}
-}
-
-class MockSystemFactory extends SystemFactory {
- def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = new SystemConsumer {
- def start() {}
- def stop() {}
- def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
- def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = new java.util.HashMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
- }
- def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = null
- def getAdmin(systemName: String, config: Config) = new MockSystemAdmin
-}
-
-class MockSystemAdmin extends ExtendedSystemAdmin {
- def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null
- def getSystemStreamMetadata(streamNames: java.util.Set[String]): java.util.Map[String, SystemStreamMetadata] = {
- assertEquals(1, streamNames.size)
- val partitionMetadata = Map(
- new Partition(0) -> new SystemStreamPartitionMetadata(null, null, null),
- new Partition(1) -> new SystemStreamPartitionMetadata(null, null, null),
- // Create a new Partition(2), which wasn't in the prior changelog mapping.
- new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null))
- Map(streamNames.asScala.toList.head -> new SystemStreamMetadata("foo", partitionMetadata.asJava)).asJava
- }
-
- override def offsetComparator(offset1: String, offset2: String) = null
-
- override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
- cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
- assertEquals(1, streamNames.size())
- val result = streamNames.asScala.map {
- stream =>
- val partitionMetadata = Map(
- new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
- new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
- new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
- )
- stream -> new SystemStreamMetadata(stream, partitionMetadata.asJava)
- }.toMap
- result.asJava
- }
-
- override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = null
-
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index 7d15fd8..c7eab3b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -39,7 +39,7 @@ import scala.collection.JavaConverters._
class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
@Test
- def testStreamPartitionCountMonitor(): Unit = {
+ def testStreamPartitionCountChange(): Unit = {
val mockMetadataCache = mock[StreamMetadataCache]
val inputSystemStream = new SystemStream("test-system", "test-stream")
val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
@@ -70,11 +70,79 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
val metrics = new MetricsRegistryMap()
+ val mockCallback = mock[StreamPartitionCountMonitor.Callback]
+
+ val partitionCountMonitor = new StreamPartitionCountMonitor(
+ inputSystemStreamSet.asJava,
+ mockMetadataCache,
+ metrics,
+ 5,
+ mockCallback
+ )
+
+ partitionCountMonitor.updatePartitionCountMetric()
+
+ assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream))
+ assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
+
+ assertNotNull(metrics.getGroup("job-coordinator"))
+
+ val metricGroup = metrics.getGroup("job-coordinator")
+ assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
+ assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+
+ verify(mockCallback, times(1)).onSystemStreamPartitionChange(any())
+
+ }
+
+ @Test
+ def testStreamPartitionCountException(): Unit = {
+ val mockMetadataCache = mock[StreamMetadataCache]
+ val inputSystemStream = new SystemStream("test-system", "test-stream")
+ val inputExceptionStream = new SystemStream("test-system", "test-exception-stream")
+ val inputSystemStreamSet = Set[SystemStream](inputSystemStream, inputExceptionStream)
+
+ val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ {
+ put(new Partition(0), new SystemStreamPartitionMetadata("", "", ""))
+ put(new Partition(1), new SystemStreamPartitionMetadata("", "", ""))
+ }
+ }
+
+ val finalPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ {
+ putAll(initialPartitionMetadata)
+ put(new Partition(2), new SystemStreamPartitionMetadata("", "", ""))
+ }
+ }
+ val streamMockMetadata = mock[java.util.HashMap[Partition, SystemStreamPartitionMetadata]]
+
+ val initialMetadata = Map(
+ inputExceptionStream -> new SystemStreamMetadata(inputExceptionStream.getStream, initialPartitionMetadata),
+ inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, initialPartitionMetadata)
+ )
+ val finalMetadata = Map(
+ inputExceptionStream -> new SystemStreamMetadata(inputExceptionStream.getStream, streamMockMetadata),
+ inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, finalPartitionMetadata)
+ )
+
+ when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true)))
+ .thenReturn(initialMetadata) // Called during StreamPartitionCountMonitor instantiation
+ .thenReturn(finalMetadata) // Called from monitor thread the second time
+
+ // make the call to get stream metadata for {@code inputExceptionStream} fail w/ a runtime exception
+ when(streamMockMetadata.keySet()).thenThrow(new RuntimeException)
+
+ val metrics = new MetricsRegistryMap()
+
+ val mockCallback = mock[StreamPartitionCountMonitor.Callback]
+
val partitionCountMonitor = new StreamPartitionCountMonitor(
inputSystemStreamSet.asJava,
mockMetadataCache,
metrics,
- 5
+ 5,
+ mockCallback
)
partitionCountMonitor.updatePartitionCountMetric()
@@ -87,6 +155,10 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
val metricGroup = metrics.getGroup("job-coordinator")
assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+
+ // Make sure as long as one of the input stream topic partition change is detected, the callback is invoked
+ verify(mockCallback, times(1)).onSystemStreamPartitionChange(any())
+
}
@Test
@@ -98,7 +170,8 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
inputSystemStreamSet.asJava,
mockMetadataCache,
new MetricsRegistryMap(),
- 50
+ 50,
+ null
)
assertFalse(monitor.isRunning())
@@ -143,7 +216,8 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
inputSystemStreamSet.asJava,
mockMetadataCache,
new MetricsRegistryMap(),
- 50
+ 50,
+ null
) {
override def updatePartitionCountMetric(): Unit = {
sampleCount.countDown()
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
index a6d82e1..3d385d6 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
@@ -20,7 +20,6 @@ package org.apache.samza.system
import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
import org.apache.samza.config._
-import org.apache.samza.coordinator.MockSystemFactory
import org.apache.samza.job.local.ThreadJobFactory
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
http://git-wip-us.apache.org/repos/asf/samza/blob/6fcf7f3f/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
index b7f2119..255c85e 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
@@ -20,7 +20,6 @@ package org.apache.samza.system
import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
import org.apache.samza.config._
-import org.apache.samza.coordinator.MockSystemFactory
import org.apache.samza.job.local.ThreadJobFactory
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._