You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/05/23 23:41:38 UTC
[samza] branch master updated: SAMZA-2213: Adding writing of
container.metadata file,
and moving exception writing logic to DiagnosticsManager
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 66f29e8 SAMZA-2213: Adding writing of container.metadata file, and moving exception writing logic to DiagnosticsManager
66f29e8 is described below
commit 66f29e8287ae0f673a75b65db9c10db8e58819dc
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Thu May 23 16:41:30 2019 -0700
SAMZA-2213: Adding writing of container.metadata file, and moving exception writing logic to DiagnosticsManager
This PR
* Removes ListGauge from everywhere.
* Adds logic to LocalContainerRunner to write metadata, that is useful when combined with java's -XX:OnOutOfMemoryError=string option.
* Adds DiagnosticsManager to publish error info to diagnostics stream instead of metrics reporter.
* Adds logic to SamzaContainer to write the DiagnosticsManager while re-using the SystemProducer from the MetricsReporter.
* Cleansup ContainerProcessManagerMetrics.
* Adds DiagnosticsManager to ContainerProcessManager and the ClusterBasedJC
Author: Ray Matharu <rm...@linkedin.com>
Author: rmatharu <40...@users.noreply.github.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #1021 from rmatharu/job-metadata
---
.../versioned/operations/monitoring.md | 2 +-
.../org/apache/samza/metrics/MetricsRegistry.java | 9 --
.../org/apache/samza/metrics/MetricsVisitor.java | 6 +-
.../metrics/ReadableMetricsRegistryListener.java | 2 -
.../org/apache/samza/util/NoOpMetricsRegistry.java | 6 -
.../samza/system/eventhub/TestMetricsRegistry.java | 9 --
.../clustermanager/ClusterBasedJobCoordinator.java | 10 +-
.../clustermanager/ContainerProcessManager.java | 109 +++++++++----
.../java/org/apache/samza/metrics/MetricGroup.java | 4 -
.../apache/samza/processor/StreamProcessor.java | 20 ++-
.../apache/samza/runtime/ContainerLaunchUtil.java | 35 ++--
.../apache/samza/runtime/LocalContainerRunner.java | 12 +-
.../org/apache/samza/util/DiagnosticsUtil.java | 126 +++++++++++++++
.../apache/samza/util/MetricsReporterLoader.java | 12 +-
.../scala/org/apache/samza/config/JobConfig.scala | 18 ++-
.../org/apache/samza/config/MetricsConfig.scala | 2 +-
.../scala/org/apache/samza/config/TaskConfig.scala | 5 -
.../apache/samza/container/SamzaContainer.scala | 44 +++---
.../samza/container/SamzaContainerMetrics.scala | 2 -
.../org/apache/samza/diagnostics/BoundedList.java | 39 +++--
.../diagnostics/DiagnosticsExceptionEvent.java | 2 -
.../samza/diagnostics/DiagnosticsManager.java | 176 +++++++++++++++++++++
.../metrics/ContainerProcessManagerMetrics.scala | 89 ++++-------
.../org/apache/samza/metrics/MetricsHelper.scala | 2 -
.../apache/samza/metrics/MetricsRegistryMap.scala | 15 --
.../samza/metrics/reporter/JmxReporter.scala | 15 --
.../metrics/reporter/MetricsSnapshotReporter.scala | 2 -
.../reporter/MetricsSnapshotReporterFactory.scala | 23 +--
.../scala/org/apache/samza/util/FileUtil.scala | 47 +++++-
.../main/scala/org/apache/samza/util/Util.scala | 29 +++-
.../TestContainerProcessManager.java | 40 ++---
.../serializers/TestMetricsSnapshotSerdeV2.java | 8 +-
.../org/apache/samza/metrics/TestBoundedList.java | 37 ++---
.../logging/log4j/SimpleDiagnosticsAppender.java | 14 +-
.../logging/log4j2/SimpleDiagnosticsAppender.java | 14 +-
.../samza/sql/util/TestMetricsRegistryImpl.java | 8 -
.../java/org/apache/samza/config/YarnConfig.java | 2 +-
.../webapp/ApplicationMasterRestServlet.scala | 3 -
38 files changed, 675 insertions(+), 323 deletions(-)
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md
index ec8430c..bbad335 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -223,7 +223,7 @@ _Gauges_ are useful when measuring the magnitude of a certain system property, e
_Counters_ are useful in measuring metrics that are cumulative values, e.g., the number of messages processed since container startup. Certain counters are also useful when visualized with their rate-of-change, e.g., the rate of message processing.
-_Timers_ are useful for storing and reporting a sliding-window of timing values. Samza also supports a ListGauge type metric, which can be used to store and report a list of any primitive-type such as strings.
+_Timers_ are useful for storing and reporting a sliding-window of timing values.
## <a name="userdefinedmetrics"></a> C. Adding User-Defined Metrics
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index fa0fd39..5a00d01 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -65,15 +65,6 @@ public interface MetricsRegistry {
<T> Gauge<T> newGauge(String group, Gauge<T> value);
/**
- * Register a {@link org.apache.samza.metrics.ListGauge}
- * @param group Group for this ListGauge
- * @param listGauge the ListGauge to register
- * @param <T> Type of the ListGauge
- * @return ListGauge registered
- */
- <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge);
-
- /**
* Create and Register a new {@link org.apache.samza.metrics.Timer}
* @param group Group for this Timer
* @param name Name of to-be-created Timer
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
index 49a0929..40ebb3b 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
@@ -31,13 +31,9 @@ public abstract class MetricsVisitor {
public abstract void timer(Timer timer);
- public abstract <T> void listGauge(ListGauge<T> listGauge);
public void visit(Metric metric) {
- // Cast for metrics of type ListGauge
- if (metric instanceof ListGauge<?>) {
- listGauge((ListGauge<?>) metric);
- } else if (metric instanceof Counter) {
+ if (metric instanceof Counter) {
counter((Counter) metric);
} else if (metric instanceof Gauge<?>) {
gauge((Gauge<?>) metric);
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
index ba5b182..739d68f 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
@@ -24,7 +24,5 @@ public interface ReadableMetricsRegistryListener {
void onGauge(String group, Gauge<?> gauge);
- void onListGauge(String group, ListGauge<?> listGauge);
-
void onTimer(String group, Timer timer);
}
diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index 76b8216..068fe72 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -21,7 +21,6 @@ package org.apache.samza.util;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
@@ -52,11 +51,6 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
}
@Override
- public <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge) {
- return listGauge;
- }
-
- @Override
public Timer newTimer(String group, String name) {
return new Timer(name);
}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
index 01b69ed..48f6edd 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
@@ -25,7 +25,6 @@ import java.util.Map;
import org.apache.commons.collections4.map.HashedMap;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
@@ -33,7 +32,6 @@ public class TestMetricsRegistry implements MetricsRegistry {
private Map<String, List<Counter>> counters = new HashedMap<>();
private Map<String, List<Gauge<?>>> gauges = new HashedMap<>();
- private Map<String, List<ListGauge>> listGauges = new HashedMap<>();
public List<Counter> getCounters(String groupName) {
return counters.get(groupName);
@@ -70,13 +68,6 @@ public class TestMetricsRegistry implements MetricsRegistry {
}
@Override
- public ListGauge newListGauge(String group, ListGauge listGauge) {
- listGauges.putIfAbsent(group, new ArrayList());
- listGauges.get(group).add(listGauge);
- return listGauge;
- }
-
- @Override
public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
if (!gauges.containsKey(group)) {
gauges.put(group, new ArrayList<>());
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 782a62a..9044361 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -57,6 +57,7 @@ import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +91,7 @@ import scala.collection.JavaConverters;
public class ClusterBasedJobCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
+ private final static String METRICS_SOURCE_NAME = "ApplicationMaster";
private final Config config;
private final ClusterManagerConfig clusterManagerConfig;
@@ -221,7 +223,13 @@ public class ClusterBasedJobCoordinator {
// initialize JobCoordinator state
LOG.info("Starting cluster based job coordinator");
- // create necessary checkpoint and changelog streams, if not created
+ // write the diagnostics metadata file
+ String jobName = new JobConfig(config).getName().get();
+ String jobId = new JobConfig(config).getJobId();
+ Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv("CONTAINER_ID"));
+ DiagnosticsUtil.writeMetadataFile(jobName, jobId, METRICS_SOURCE_NAME, execEnvContainerId, config);
+
+ //create necessary checkpoint and changelog streams, if not created
JobModel jobModel = jobModelManager.jobModel();
MetadataResourceUtil metadataResourceUtil =
new MetadataResourceUtil(jobModel, metrics);
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 70b5579..5f48df5 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -19,20 +19,29 @@
package org.apache.samza.clustermanager;
import java.util.Optional;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.metrics.ContainerProcessManagerMetrics;
+import org.apache.samza.metrics.JvmMetrics;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
+import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import scala.Option;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -57,6 +66,13 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
private static final Logger log = LoggerFactory.getLogger(ContainerProcessManager.class);
/**
+ * Metrics for the {@link ContainerProcessManager}
+ */
+ private final static String METRICS_SOURCE_NAME = "ApplicationMaster";
+ private final static String EXEC_ENV_CONTAINER_ID_SYS_PROPERTY = "CONTAINER_ID";
+
+
+ /**
* Does this Samza Job need hostAffinity when containers are allocated.
*/
private final boolean hostAffinityEnabled;
@@ -81,6 +97,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
// The StandbyContainerManager manages standby-aware allocation and failover of containers
private final Optional<StandbyContainerManager> standbyContainerManager;
+ private final Option<DiagnosticsManager> diagnosticsManager;
+
/**
* A standard interface to request resources.
*/
@@ -102,11 +120,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
* value is the {@link ProcessorFailure} object that has a count of failures.
*/
private final Map<String, ProcessorFailure> processorFailures = new HashMap<>();
-
- /**
- * Metrics for the {@link ContainerProcessManager}
- */
- private final ContainerProcessManagerMetrics metrics;
+ private ContainerProcessManagerMetrics containerProcessManagerMetrics;
+ private JvmMetrics jvmMetrics;
+ private Map<String, MetricsReporter> metricsReporters;
public ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry) {
this.state = state;
@@ -117,8 +133,30 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
- this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
+ // Initialize metrics
+ this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(config, state, registry);
+ this.jvmMetrics = new JvmMetrics(registry);
+ this.metricsReporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), METRICS_SOURCE_NAME);
+
+ // Creating diagnostics manager and reporter, and wiring it respectively
+ String jobName = new JobConfig(config).getName().get();
+ String jobId = new JobConfig(config).getJobId();
+ Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
+ Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, METRICS_SOURCE_NAME, execEnvContainerId, config);
+
+ if (diagnosticsManagerReporterPair.isPresent()) {
+ diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
+ metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue());
+ } else {
+ diagnosticsManager = Option.empty();
+ }
+
+ // Wire all metrics to all reporters
+ this.metricsReporters.values().forEach(reporter -> reporter.register(METRICS_SOURCE_NAME, registry));
+
+ // Enable standby container manager if required
if (jobConfig.getStandbyTasksEnabled()) {
this.standbyContainerManager = Optional.of(new StandbyContainerManager(state, clusterResourceManager));
} else {
@@ -135,23 +173,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
log.info("Finished container process manager initialization.");
}
- //for testing
- ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry,
- AbstractContainerAllocator allocator, ClusterResourceManager manager) {
- this.state = state;
- this.clusterManagerConfig = new ClusterManagerConfig(config);
- this.jobConfig = new JobConfig(config);
- this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
- this.clusterResourceManager = manager;
- this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
- this.containerAllocator = allocator;
- this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
- this.standbyContainerManager = Optional.empty();
- }
-
//package private, used only in tests
ContainerProcessManager(Config config, SamzaApplicationState state, MetricsRegistryMap registry,
- ClusterResourceManager resourceManager) {
+ ClusterResourceManager resourceManager, Optional<AbstractContainerAllocator> allocator) {
JobModelManager jobModelManager = state.jobModelManager;
this.state = state;
this.clusterManagerConfig = new ClusterManagerConfig(config);
@@ -160,10 +184,12 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
this.clusterResourceManager = resourceManager;
- this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
this.standbyContainerManager = Optional.empty();
+ this.diagnosticsManager = Option.empty();
- if (this.hostAffinityEnabled) {
+ if (allocator.isPresent()) {
+ this.containerAllocator = allocator.get();
+ } else if (this.hostAffinityEnabled) {
this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, this.standbyContainerManager, state);
} else {
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
@@ -186,7 +212,18 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
public void start() {
log.info("Starting the container process manager");
- metrics.start();
+
+ if (jvmMetrics != null) {
+ jvmMetrics.start();
+ }
+
+ if (this.metricsReporters != null) {
+ this.metricsReporters.values().forEach(reporter -> reporter.start());
+ }
+
+ if (this.diagnosticsManager.isDefined()) {
+ this.diagnosticsManager.get().start();
+ }
log.info("Starting the cluster resource manager");
clusterResourceManager.start();
@@ -217,16 +254,30 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
Thread.currentThread().interrupt();
}
- if (metrics != null) {
+ if (this.diagnosticsManager.isDefined()) {
try {
- metrics.stop();
- log.info("Stopped metrics reporters");
- } catch (Throwable e) {
- log.error("Exception while stopping metrics", e);
+ this.diagnosticsManager.get().stop();
+ } catch (InterruptedException e) {
+ log.error("InterruptedException while stopping diagnosticsManager", e);
}
}
try {
+
+ if (this.metricsReporters != null) {
+ this.metricsReporters.values().forEach(reporter -> reporter.stop());
+ }
+
+ if (this.jvmMetrics != null) {
+ jvmMetrics.stop();
+ }
+
+ log.info("Stopped containerProcessManagerMetrics reporters");
+ } catch (Throwable e) {
+ log.error("Exception while stopping containerProcessManagerMetrics", e);
+ }
+
+ try {
clusterResourceManager.stop(state.status);
log.info("Stopped the cluster resource manager");
} catch (Throwable e) {
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
index fc57846..53526d8 100644
--- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
+++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
@@ -44,10 +44,6 @@ public class MetricGroup {
return registry.newCounter(groupName, (prefix + name).toLowerCase());
}
- public <T> ListGauge<T> newListGauge(String name) {
- return registry.newListGauge(groupName, new ListGauge(name));
- }
-
public <T> Gauge<T> newGauge(String name, T value) {
return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value));
}
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 2a8c862..c946bb5 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -31,9 +31,12 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainerListener;
@@ -46,11 +49,14 @@ import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.runtime.ProcessorLifecycleListener;
import org.apache.samza.task.TaskFactory;
+import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ScalaJavaUtil;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
@@ -330,11 +336,23 @@ public class StreamProcessor {
@VisibleForTesting
SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
+
+ // Creating diagnostics manager and reporter, and wiring it respectively
+ String jobName = new JobConfig(config).getName().get();
+ String jobId = new JobConfig(config).getJobId();
+ Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair =
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, processorId, Optional.empty(), config);
+ Option<DiagnosticsManager> diagnosticsManager = Option.empty();
+ if (diagnosticsManagerReporterPair.isPresent()) {
+ diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
+ this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue());
+ }
+
return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
- Option.apply(this.externalContextOptional.orElse(null)), null, null);
+ Option.apply(this.externalContextOptional.orElse(null)), null, null, diagnosticsManager);
}
private JobCoordinator createJobCoordinator() {
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 564dbf3..978ab24 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -19,11 +19,16 @@
package org.apache.samza.runtime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.ContainerHeartbeatClient;
import org.apache.samza.container.ContainerHeartbeatMonitor;
@@ -36,20 +41,20 @@ import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ScalaJavaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
public class ContainerLaunchUtil {
private static final Logger log = LoggerFactory.getLogger(ContainerLaunchUtil.class);
@@ -64,18 +69,21 @@ public class ContainerLaunchUtil {
*/
public static void run(
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
- String containerId,
+ String jobName, String jobId, String containerId, Optional<String> execEnvContainerId,
JobModel jobModel) {
Config config = jobModel.getConfig();
- run(appDesc, containerId, jobModel, config, buildExternalContext(config));
+ run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel, config, buildExternalContext(config));
System.exit(0);
}
private static void run(
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+ String jobName,
+ String jobId,
String containerId,
+ Optional<String> execEnvContainerId,
JobModel jobModel,
Config config,
Optional<ExternalContext> externalContextOptional) {
@@ -90,15 +98,24 @@ public class ContainerLaunchUtil {
startpointManager = Optional.of(new StartpointManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, StartpointManager.NAMESPACE)));
}
+ Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
+
+ // Creating diagnostics manager and reporter, and wiring it respectively
+ Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, containerId, execEnvContainerId, config);
+ Option<DiagnosticsManager> diagnosticsManager = Option.empty();
+ if (diagnosticsManagerReporterPair.isPresent()) {
+ diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey());
+ metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS(), diagnosticsManagerReporterPair.get().getValue());
+ }
+
SamzaContainer container = SamzaContainer$.MODULE$.apply(
- containerId,
- jobModel,
- ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
+ containerId, jobModel,
+ ScalaJavaUtil.toScalaMap(metricsReporters),
taskFactory,
JobContextImpl.fromConfigWithDefaults(config),
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
- Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager.orElse(null));
+ Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager.orElse(null), diagnosticsManager);
ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
.createInstance(new ProcessorContext() { }, config);
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index a3e5acf..5197dbc 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -19,6 +19,8 @@
package org.apache.samza.runtime;
+import java.util.Optional;
+import java.util.Random;
import org.apache.samza.SamzaException;
import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
@@ -29,11 +31,12 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.SamzaUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import java.util.Random;
+
/**
* Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
@@ -56,6 +59,8 @@ public class LocalContainerRunner {
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
+ Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID()));
+
int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
Config config = jobModel.getConfig();
@@ -69,9 +74,12 @@ public class LocalContainerRunner {
MDC.put("jobName", jobName);
MDC.put("jobId", jobId);
+ DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
+
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
- ContainerLaunchUtil.run(appDesc, containerId, jobModel);
+ ContainerLaunchUtil.run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel);
}
+
}
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
new file mode 100644
index 0000000..3c05228
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Optional;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
+import org.apache.samza.runtime.LocalContainerRunner;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+public class DiagnosticsUtil {
+ private static final Logger log = LoggerFactory.getLogger(DiagnosticsUtil.class);
+
+ // Write a file in the samza.log.dir named {exec-env-container-id}.metadata that contains
+ // metadata about the container such as containerId, jobName, jobId, hostname, timestamp, version info, and others.
+ public static void writeMetadataFile(String jobName, String jobId, String containerId,
+ Optional<String> execEnvContainerId, Config config) {
+
+ Option<File> metadataFile = JobConfig.getMetadataFile(Option.apply(execEnvContainerId.orElse(null)));
+
+ if (metadataFile.isDefined()) {
+
+ StringBuilder metadata = new StringBuilder("Version: 1");
+ metadata.append(System.lineSeparator());
+ MetricsHeader metricsHeader =
+ new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(),
+ Util.getTaskClassVersion(config), Util.getSamzaVersion(), Util.getLocalHost().getHostName(),
+ System.currentTimeMillis(), System.currentTimeMillis());
+
+ MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics());
+ metadata.append("ContainerMetadata: ");
+ metadata.append(new String(new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+ FileUtil.writeToTextFile(metadataFile.get(), metadata.toString(), false);
+ } else {
+ log.info("Skipping writing metadata file.");
+ }
+ }
+
+
+ /**
+ * Create a pair of DiagnosticsManager and Reporter for the given jobName, jobId, containerId, and execEnvContainerId,
+ * if diagnostics is enabled.
+ * execEnvContainerId is the ID assigned to the container by the cluster manager (e.g., YARN).
+ */
+ public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildDiagnosticsManager(String jobName, String jobId,
+ String containerId, Optional<String> execEnvContainerId, Config config) {
+
+ Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = Optional.empty();
+
+ if (new JobConfig(config).getDiagnosticsEnabled()) {
+
+ // Diagnostic stream, producer, and reporter related parameters
+ String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS();
+ Integer publishInterval = new MetricsConfig(config).getMetricsSnapshotReporterInterval(diagnosticsReporterName);
+ String taskClassVersion = Util.getTaskClassVersion(config);
+ String samzaVersion = Util.getSamzaVersion();
+ String hostName = Util.getLocalHost().getHostName();
+ Option<String> blacklist = new MetricsConfig(config).getMetricsSnapshotReporterBlacklist(diagnosticsReporterName);
+ Option<String> diagnosticsReporterStreamName = new MetricsConfig(config).getMetricsSnapshotReporterStream(diagnosticsReporterName);
+
+ if (diagnosticsReporterStreamName.isEmpty()) {
+ throw new ConfigException("Missing required config: " + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM(), diagnosticsReporterName));
+ }
+
+ SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
+
+ Optional<String> diagnosticsSystemFactoryName = new SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
+ if (!diagnosticsSystemFactoryName.isPresent()) {
+ throw new SamzaException("Missing factory in config for system " + diagnosticsSystemStream.getSystem());
+ }
+
+ // Create a systemProducer for giving to diagnostic-reporter and diagnosticsManager
+ SystemFactory systemFactory = Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
+ SystemProducer systemProducer = systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap());
+ DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
+ samzaVersion, hostName, diagnosticsSystemStream, systemProducer, Duration.ofMillis(new TaskConfigJava(config).getShutdownMs()));
+
+ MetricsSnapshotReporter diagnosticsReporter =
+ new MetricsSnapshotReporter(systemProducer, diagnosticsSystemStream, publishInterval, jobName, jobId,
+ "samza-container-" + containerId, taskClassVersion, samzaVersion, hostName, new MetricsSnapshotSerdeV2(),
+ blacklist, ScalaJavaUtil.toScalaFunction(() -> System.currentTimeMillis()));
+
+ diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
+ }
+
+ return diagnosticsManagerReporterPair;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
index e7eeb5e..8acb5e8 100644
--- a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
+++ b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
@@ -19,7 +19,9 @@
package org.apache.samza.util;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.metrics.MetricsReporter;
@@ -37,11 +39,19 @@ public class MetricsReporterLoader {
public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig config, String containerName) {
Map<String, MetricsReporter> metricsReporters = new HashMap<>();
- for (String metricsReporterName : JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava()) {
+ String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS();
+
+ // Exclude creation of diagnostics-reporter, because it is created manually in SamzaContainer (to allow sharing of
+ // sysProducer between reporter and diagnosticsManager
+ List<String> metricsReporterNames = JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava().
+ stream().filter(reporterName -> !reporterName.equals(diagnosticsReporterName)).collect(Collectors.toList());
+
+ for (String metricsReporterName : metricsReporterNames) {
String metricsFactoryClassName = config.getMetricsFactoryClass(metricsReporterName).get();
if (metricsFactoryClassName == null) {
throw new SamzaException(String.format("Metrics reporter %s missing .class config", metricsReporterName));
}
+
MetricsReporterFactory metricsReporterFactory = Util.getObj(metricsFactoryClassName, MetricsReporterFactory.class);
metricsReporters.put(metricsReporterName,
metricsReporterFactory.getMetricsReporter(metricsReporterName,
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index fd97707..24140d9 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -114,6 +114,10 @@ object JobConfig {
val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1
val SYSTEM_STREAM_PARTITION_MAPPER_FACTORY = "job.system.stream.partition.mapper.factory"
+ // Naming format and directory for container.metadata file
+ private val CONTAINER_METADATA_FILENAME_FORMAT = "%s.metadata" // Filename: <containerID>.metadata
+ private val CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY = "samza.log.dir"
+
implicit def Config2Job(config: Config) = new JobConfig(config)
/**
@@ -132,6 +136,18 @@ object JobConfig {
}
fwkPath
}
+
+ /** The metadata file is written in a <exec-env-container-id>.metadata file in the log-dir of the container.
+ Here the <exec-env-container-id> refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
+ which uniquely identifies a container's lifecycle.*/
+ def getMetadataFile(execEnvContainerId: Option[String]): Option[File] = {
+ val dir = System.getProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY)
+ if (dir == null || execEnvContainerId.isEmpty) {
+ None
+ } else {
+ Option.apply(new File(dir, String.format(JobConfig.CONTAINER_METADATA_FILENAME_FORMAT, execEnvContainerId.get)))
+ }
+ }
}
class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
@@ -207,8 +223,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
-
-
def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")
diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
index da29013..d41f5fb 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -49,7 +49,7 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
def getMetricsSnapshotReporterStream(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name)
- def getMetricsSnapshotReporterInterval(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name)
+ def getMetricsSnapshotReporterInterval(name: String): Int = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name).getOrElse("60").toInt
def getMetricsSnapshotReporterBlacklist(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name)
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index ba5d932..9516327 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -96,11 +96,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case _ => TaskConfig.DEFAULT_COMMIT_MS
}
- def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
- case Some(ms) => Some(ms.toLong)
- case _ => None
- }
-
def getTaskClass = getOption(TaskConfig.TASK_CLASS)
def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 068d494..c38274b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -42,6 +42,7 @@ import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
import org.apache.samza.context._
+import org.apache.samza.diagnostics.DiagnosticsManager
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode}
import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
import org.apache.samza.serializers._
@@ -134,11 +135,14 @@ object SamzaContainer extends Logging {
applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
externalContextOption: Option[ExternalContext],
localityManager: LocalityManager = null,
- startpointManager: StartpointManager = null) = {
+ startpointManager: StartpointManager = null,
+ diagnosticsManager: Option[DiagnosticsManager] = Option.empty) = {
val config = jobContext.getConfig
val systemConfig = new SystemConfig(config)
val containerModel = jobModel.getContainers.get(containerId)
val containerName = "samza-container-%s" format containerId
+ val jobName = config.getName.get
+ val jobId = config.getJobId
val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
val containerPID = ManagementFactory.getRuntimeMXBean().getName()
@@ -651,7 +655,8 @@ object SamzaContainer extends Logging {
containerContext = containerContext,
applicationContainerContextOption = applicationContainerContextOption,
externalContextOption = externalContextOption,
- containerStorageManager = containerStorageManager)
+ containerStorageManager = containerStorageManager,
+ diagnosticsManager = diagnosticsManager)
}
/**
@@ -689,9 +694,10 @@ class SamzaContainer(
containerContext: ContainerContext,
applicationContainerContextOption: Option[ApplicationContainerContext],
externalContextOption: Option[ExternalContext],
- containerStorageManager: ContainerStorageManager) extends Runnable with Logging {
+ containerStorageManager: ContainerStorageManager,
+ diagnosticsManager: Option[DiagnosticsManager] = Option.empty) extends Runnable with Logging {
- val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
+ val shutdownMs = new TaskConfigJava(config).getShutdownMs
var shutdownHookThread: Thread = null
var jmxServer: JmxServer = null
@@ -775,6 +781,7 @@ class SamzaContainer(
shutdownHostStatisticsMonitor
shutdownProducers
shutdownOffsetManager
+ shutdownDiagnostics
shutdownMetrics
shutdownSecurityManger
shutdownAdmins
@@ -867,25 +874,9 @@ class SamzaContainer(
}
def startDiagnostics {
- if (config.getDiagnosticsEnabled) {
- info("Starting diagnostics.")
-
- try {
- var diagnosticsAppender = Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", (classOf[SamzaContainerMetrics], this.metrics))
- info("Attached log4j diagnostics appender.")
- }
- catch {
- case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
- try {
- val diagnosticsAppender = Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", (classOf[SamzaContainerMetrics], this.metrics))
- info("Attached log4j2 diagnostics appender.")
- } catch {
- case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
- warn("Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream", e)
- }
- }
- }
- }
+ if (diagnosticsManager.isDefined) {
+ info("Starting diagnostics manager.")
+ diagnosticsManager.get.start()
}
}
@@ -1076,6 +1067,13 @@ class SamzaContainer(
offsetManager.stop
}
+ def shutdownDiagnostics {
+ if (diagnosticsManager.isDefined) {
+ info("Shutting down diagnostics manager.")
+ diagnosticsManager.get.stop()
+ }
+ }
+
def shutdownMetrics {
info("Shutting down metrics reporters.")
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 326156b..56f2ab6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -49,8 +49,6 @@ class SamzaContainerMetrics(
val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
- val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
-
def addStoresRestorationGauge(taskName: TaskName) {
taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" format(taskName.toString), -1L))
}
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/BoundedList.java
similarity index 77%
rename from samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
rename to samza-core/src/main/scala/org/apache/samza/diagnostics/BoundedList.java
index d0fb326..ad76c28 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/BoundedList.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.metrics;
+package org.apache.samza.diagnostics;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
@@ -29,17 +30,17 @@ import org.apache.samza.util.TimestampedValue;
/**
- * A {@link ListGauge} is a {@link Metric} that buffers multiple instances of a type T in a list.
- * {@link ListGauge}s are useful for maintaining, recording, or collecting values over time.
+ * A {@link BoundedList} buffers multiple instances of a type T in a list.
+ * {@link BoundedList}s are useful for maintaining, recording, or collecting values over time.
* For example, a set of specific logging-events (e.g., errors).
*
* Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation.
- * Eviction happens during element addition or during reads of the ListGauge (getValues).
+ * Eviction happens during element addition or during reads of the BoundedList (getValues).
*
* All public methods are thread-safe.
*
*/
-public class ListGauge<T> implements Metric {
+public class BoundedList<T> {
private final String name;
private final Queue<TimestampedValue<T>> elements;
@@ -49,13 +50,13 @@ public class ListGauge<T> implements Metric {
private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60);
/**
- * Create a new {@link ListGauge} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
+ * Create a new {@link BoundedList} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
*
* @param name Name to be assigned
- * @param maxNumberOfItems The max number of items that can remain in the listgauge
- * @param maxStaleness The max staleness of items permitted in the listgauge
+ * @param maxNumberOfItems The max number of items that can remain in the list
+ * @param maxStaleness The max staleness of items permitted in the list
*/
- public ListGauge(String name, int maxNumberOfItems, Duration maxStaleness) {
+ public BoundedList(String name, int maxNumberOfItems, Duration maxStaleness) {
this.name = name;
this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>();
this.maxNumberOfItems = maxNumberOfItems;
@@ -63,15 +64,15 @@ public class ListGauge<T> implements Metric {
}
/**
- * Create a new {@link ListGauge} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
+ * Create a new {@link BoundedList} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
* @param name Name to be assigned
*/
- public ListGauge(String name) {
+ public BoundedList(String name) {
this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS);
}
/**
- * Get the name assigned to this {@link ListGauge}
+ * Get the name assigned to this {@link BoundedList}
* @return the assigned name
*/
public String getName() {
@@ -100,11 +101,17 @@ public class ListGauge<T> implements Metric {
}
/**
- * {@inheritDoc}
+ * Removes the given elements from the list-gauge.
+ * @param elementsToRemove collection of elements to remove.
*/
- @Override
- public void visit(MetricsVisitor visitor) {
- visitor.listGauge(this);
+ public void remove(Collection<T> elementsToRemove) {
+ Iterator<TimestampedValue<T>> iterator = this.elements.iterator();
+ while (iterator.hasNext()) {
+ TimestampedValue<T> value = iterator.next();
+ if (elementsToRemove.contains(value.getValue())) {
+ iterator.remove();
+ }
+ }
}
/**
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
index 5a6be1e..cdae352 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
@@ -26,8 +26,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
/**
* This class encapsulates information related to an exception event that is useful for diagnostics.
- * It used to define container, task, and other metrics as
- * {@link org.apache.samza.metrics.ListGauge} of type {@link DiagnosticsExceptionEvent}.
*/
public class DiagnosticsExceptionEvent {
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
new file mode 100644
index 0000000..daaefe4
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.diagnostics;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+
+/**
+ * Responsible for publishing data to the diagnostic stream.
+ * Currently emits exception/error events obtained using a customer-appender that attaches to the root-logger.
+ */
+public class DiagnosticsManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsManager.class);
+ private static final Duration DEFAULT_PUBLISH_PERIOD = Duration.ofSeconds(60);
+ // Period size for pushing data to the diagnostic stream
+
+ private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager Thread-%d";
+ private static final String METRICS_GROUP_NAME = "org.apache.samza.container.SamzaContainerMetrics";
+ // Using SamzaContainerMetrics as the group name to maintain compatibility with existing diagnostics
+
+ // Parameters used for populating the MetricHeader when sending diagnostic-stream messages
+ private final String jobName;
+ private final String jobId;
+ private final String containerId;
+ private final String executionEnvContainerId;
+ private final String taskClassVersion;
+ private final String samzaVersion;
+ private final String hostname;
+ private final Instant resetTime;
+
+ private SystemProducer systemProducer; // SystemProducer for writing diagnostics data
+ private BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent
+ private final ScheduledExecutorService scheduler; // Scheduler for pushing data to the diagnostic stream
+ private final Duration terminationDuration; // duration to wait when terminating the scheduler
+ private final SystemStream diagnosticSystemStream;
+
+ public DiagnosticsManager(String jobName, String jobId, String containerId, String executionEnvContainerId,
+ String taskClassVersion, String samzaVersion, String hostname, SystemStream diagnosticSystemStream,
+ SystemProducer systemProducer, Duration terminationDuration) {
+ this.jobName = jobName;
+ this.jobId = jobId;
+ this.containerId = containerId;
+ this.executionEnvContainerId = executionEnvContainerId;
+ this.taskClassVersion = taskClassVersion;
+ this.samzaVersion = samzaVersion;
+ this.hostname = hostname;
+ resetTime = Instant.now();
+
+ this.systemProducer = systemProducer;
+ this.diagnosticSystemStream = diagnosticSystemStream;
+
+ this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build());
+ this.terminationDuration = terminationDuration;
+
+ try {
+
+ Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender",
+ JavaConverters.collectionAsScalaIterableConverter(
+ Collections.singletonList(new Tuple2<Class<?>, Object>(DiagnosticsManager.class, this)))
+ .asScala()
+ .toSeq());
+
+ LOG.info("Attached log4j diagnostics appender.");
+ } catch (ClassNotFoundException | InstantiationException | InvocationTargetException e) {
+ try {
+ Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender",
+ JavaConverters.collectionAsScalaIterableConverter(
+ Collections.singletonList(new Tuple2<Class<?>, Object>(DiagnosticsManager.class, this)))
+ .asScala()
+ .toSeq());
+ LOG.info("Attached log4j diagnostics appender.");
+ } catch (ClassNotFoundException | InstantiationException | InvocationTargetException ex) {
+
+ LOG.warn(
+ "Failed to instantiate neither diagnostic appender for sending error information to diagnostics stream.",
+ ex);
+ }
+ }
+ }
+
+ public void start() {
+ this.scheduler.scheduleWithFixedDelay(new DiagnosticsStreamPublisher(), 0, DEFAULT_PUBLISH_PERIOD.getSeconds(),
+ TimeUnit.SECONDS);
+ }
+
+ public void stop() throws InterruptedException {
+ scheduler.shutdown();
+
+ // Allow any scheduled publishes to finish, and block for termination
+ scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
+
+ if (!scheduler.isTerminated()) {
+ LOG.warn("Unable to terminate scheduler");
+ scheduler.shutdownNow();
+ }
+ }
+
+ public void addExceptionEvent(DiagnosticsExceptionEvent diagnosticsExceptionEvent) {
+ this.exceptions.add(diagnosticsExceptionEvent);
+ }
+
+ private class DiagnosticsStreamPublisher implements Runnable {
+
+ @Override
+ public void run() {
+ // Publish exception events if there are any
+ Collection<DiagnosticsExceptionEvent> exceptionList = exceptions.getValues();
+
+ if (!exceptionList.isEmpty()) {
+
+ // Create the metricHeader
+ MetricsHeader metricsHeader = new MetricsHeader(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId,
+ DiagnosticsUtil.class.getName(), taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(),
+ resetTime.toEpochMilli());
+
+ Map<String, Map<String, Object>> metricsMessage = new HashMap<>();
+ metricsMessage.putIfAbsent(METRICS_GROUP_NAME, new HashMap<>());
+ metricsMessage.get(METRICS_GROUP_NAME).put(exceptions.getName(), exceptionList);
+ MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricsMessage));
+
+ try {
+ systemProducer.send(DiagnosticsManager.class.getName(),
+ new OutgoingMessageEnvelope(diagnosticSystemStream, metricsHeader.getHost(), null,
+ new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+
+ // Remove exceptions from list after successful publish to diagnostics stream
+ exceptions.remove(exceptionList);
+ } catch (Exception e) {
+ LOG.error("Exception when flushing exceptions", e);
+ }
+ }
+ }
+ }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 30c8d1d..6d6b1b7 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -21,68 +21,45 @@ package org.apache.samza.metrics
import org.apache.samza.clustermanager.SamzaApplicationState
import org.apache.samza.config.{ClusterManagerConfig, Config}
-import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.util.Logging
-import org.apache.samza.util.MetricsReporterLoader
-
-import scala.collection.JavaConverters._
-
-object ContainerProcessManagerMetrics {
- val sourceName = "ApplicationMaster"
-}
/**
- * Responsible for wiring up Samza's metrics. Given that Samza has a metric
- * registry, we might as well use it. This class takes Samza's application
- * master state, and converts it to metrics.
- */
-class ContainerProcessManagerMetrics(
- val config: Config,
- val state: SamzaApplicationState,
- val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
-
- val jvm = new JvmMetrics(registry)
- val reporters = MetricsReporterLoader.getMetricsReporters(config, ContainerProcessManagerMetrics.sourceName).asScala
+ * Responsible for wiring up Samza's metrics. Given that Samza has a metric
+ * registry, we might as well use it. This class takes Samza's application
+ * master state, and converts it to metrics.
+ */
+class ContainerProcessManagerMetrics(val config: Config,
+ val state: SamzaApplicationState,
+ val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
val clusterManagerConfig = new ClusterManagerConfig(config)
- reporters.values.foreach(_.register(ContainerProcessManagerMetrics.sourceName, registry))
-
- def start() {
- val mRunningContainers = newGauge("running-containers", () => state.runningProcessors.size)
- val mNeededContainers = newGauge("needed-containers", () => state.neededProcessors.get())
- val mCompletedContainers = newGauge("completed-containers", () => state.completedProcessors.get())
- val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
- val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
- val mContainers = newGauge("container-count", () => state.processorCount)
- val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get())
- val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
- val mPreferredHostRequests = newGauge("preferred-host-requests", () => state.preferredHostRequests.get())
- val mAnyHostRequests = newGauge("any-host-requests", () => state.anyHostRequests.get())
- val mExpiredPreferredHostRequests = newGauge("expired-preferred-host-requests", () => state.expiredPreferredHostRequests.get())
- val mExpiredAnyHostRequests = newGauge("expired-any-host-requests", () => state.expiredAnyHostRequests.get())
-
- val mHostAffinityMatchPct = newGauge("host-affinity-match-pct", () => {
- val numPreferredHostRequests = state.preferredHostRequests.get()
- val numExpiredPreferredHostRequests = state.expiredPreferredHostRequests.get()
- if (numPreferredHostRequests != 0) {
- 100.00 * (numPreferredHostRequests - numExpiredPreferredHostRequests) / numPreferredHostRequests
- } else {
- 0L
- }
- })
-
- val mFailedStandbyAllocations = newGauge("failed-standby-allocations", () => state.failedStandbyAllocations.get())
- val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => state.failoversToAnyHost.get())
- val mFailoversToStandby = newGauge("failovers-to-standby", () => state.failoversToStandby.get())
+ val mRunningContainers = newGauge("running-containers", () => state.runningProcessors.size)
+ val mNeededContainers = newGauge("needed-containers", () => state.neededProcessors.get())
+ val mCompletedContainers = newGauge("completed-containers", () => state.completedProcessors.get())
+ val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get())
+ val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
+ val mContainers = newGauge("container-count", () => state.processorCount)
+ val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get())
+ val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
+ val mPreferredHostRequests = newGauge("preferred-host-requests", () => state.preferredHostRequests.get())
+ val mAnyHostRequests = newGauge("any-host-requests", () => state.anyHostRequests.get())
+ val mExpiredPreferredHostRequests = newGauge("expired-preferred-host-requests", () => state.expiredPreferredHostRequests.get())
+ val mExpiredAnyHostRequests = newGauge("expired-any-host-requests", () => state.expiredAnyHostRequests.get())
- val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
- val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)
+ val mHostAffinityMatchPct = newGauge("host-affinity-match-pct", () => {
+ val numPreferredHostRequests = state.preferredHostRequests.get()
+ val numExpiredPreferredHostRequests = state.expiredPreferredHostRequests.get()
+ if (numPreferredHostRequests != 0) {
+ 100.00 * (numPreferredHostRequests - numExpiredPreferredHostRequests) / numPreferredHostRequests
+ } else {
+ 0L
+ }
+ })
- jvm.start
- reporters.values.foreach(_.start)
- }
+ val mFailedStandbyAllocations = newGauge("failed-standby-allocations", () => state.failedStandbyAllocations.get())
+ val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => state.failoversToAnyHost.get())
+ val mFailoversToStandby = newGauge("failovers-to-standby", () => state.failoversToStandby.get())
- def stop() {
- reporters.values.foreach(_.stop)
- }
+ val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
+ val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)
}
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
index 21ec763..6e1fb8c 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -40,8 +40,6 @@ trait MetricsHelper {
def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value)
- def newListGauge[T](name: String) = metricGroup.newListGauge[T](name)
-
/**
* Specify a dynamic gauge that always returns the latest value when polled.
* The value closure must be thread safe, since metrics reporters may access
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index 75ed6aa..40ffee2 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -75,21 +75,6 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with
newTimer(group, new Timer(name))
}
- /**
- * Register a {@link org.apache.samza.metrics.ListGauge}
- *
- * @param group Group for this ListGauge
- * @param listGauge the ListGauge to register
- * @tparam T the type of the list gauge
- */
- def newListGauge[T](group: String, listGauge: ListGauge[T]) = {
- debug("Adding new listgauge %s %s %s." format(group, listGauge.getName, listGauge))
- putAndGetGroup(group).putIfAbsent(listGauge.getName, listGauge)
- val realListGauge = metrics.get(group).get(listGauge.getName).asInstanceOf[ListGauge[T]]
- listeners.foreach(_.onListGauge(group, realListGauge))
- realListGauge
- }
-
private def putAndGetGroup(group: String) = {
metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
metrics.get(group)
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index b48aaf7..84a487a 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -25,7 +25,6 @@ import java.util
import org.apache.samza.util.Logging
import javax.management.MBeanServer
import javax.management.ObjectName
-
import org.apache.samza.config.Config
import org.apache.samza.metrics._
@@ -49,8 +48,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))))
def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry))))
- def listGauge[T](listGauge: ListGauge[T]) = registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, name, sources(registry))))
-
})
}
})
@@ -70,9 +67,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
def onTimer(group: String, timer: Timer) {
registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source)))
}
- def onListGauge(group: String, listGauge: ListGauge[_]) {
- registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, listGauge.getName, source)))
- }
}
} else {
warn("Trying to re-register a registry for source %s. Ignoring." format source)
@@ -105,20 +99,11 @@ trait JmxGaugeMBean extends MetricMBean {
def getValue(): Object
}
-trait JmxListGaugeMBean extends MetricMBean {
- def getValue(): util.Collection[Object]
-}
-
class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extends JmxGaugeMBean {
def getValue = g.getValue
def objectName = on
}
-class JmxListGauge(g: org.apache.samza.metrics.ListGauge[Object], on: ObjectName) extends JmxListGaugeMBean {
- def getValue = g.getValues
- def objectName = on
-}
-
trait JmxCounterMBean extends MetricMBean {
def getCount(): Long
}
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 8c13840..f9cf819 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -122,8 +122,6 @@ class MetricsSnapshotReporter(
case (name, metric) =>
if (!shouldIgnore(group, name)) {
metric.visit(new MetricsVisitor {
- // for listGauge the value is returned as a list, which gets serialized
- def listGauge[T](listGauge: ListGauge[T]) = { groupMsg.put(name, listGauge.getValues) }
def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 2191572..957cd6f 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -45,24 +45,6 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
val jobId = config
.getJobId
- val version =
- try {
- val taskClass = Option(new ApplicationConfig(config).getAppClass())
- .orElse(config.getTaskClass).get
- Option(Class.forName(taskClass).getPackage.getImplementationVersion).get
- } catch {
- case e: Exception => {
- warn("Unable to find implementation version in jar's meta info. Defaulting to 0.0.1.")
- "0.0.1"
- }
- }
-
- val samzaVersion = Option(classOf[MetricsSnapshotReporterFactory].getPackage.getImplementationVersion)
- .getOrElse({
- warn("Unable to find implementation samza version in jar's meta info. Defaulting to 0.0.1.")
- "0.0.1"
- })
-
val metricsSystemStreamName = config
.getMetricsSnapshotReporterStream(name)
.getOrElse(throw new SamzaException("No metrics stream defined in config."))
@@ -104,7 +86,6 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
val pollingInterval: Int = config
.getMetricsSnapshotReporterInterval(name)
- .getOrElse("60").toInt
info("Setting polling interval to %d" format pollingInterval)
@@ -118,8 +99,8 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
jobName,
jobId,
containerName,
- version,
- samzaVersion,
+ Util.getTaskClassVersion(config),
+ Util.getSamzaVersion(),
Util.getLocalHost.getHostName,
serde, blacklist)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index 0845b5c..98b1447 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -21,13 +21,13 @@
package org.apache.samza.util
-import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.io._
import java.nio.file._
import java.util.zip.CRC32
import org.apache.samza.util.Util.info
-object FileUtil {
+object FileUtil extends Logging {
/**
* Writes checksum & data to a file
* Checksum is pre-fixed to the data and is a 32-bit long type data.
@@ -51,11 +51,50 @@ object FileUtil {
}
//atomic swap of tmp and real offset file
+ swapFiles(tmpFile, file)
+ }
+
+ /**
+ * Writes the data to a text file
+ * @param file The file handle to write to
+ * @param data The data to be written to the file
+ * @param append true for appending data to file, false otherwise
+ * */
+ def writeToTextFile(file: File, data: String, append: Boolean): Unit = {
+
+ val tmpFilePath = file.getAbsolutePath + ".tmp"
+ var fileWriter: FileWriter = null
+ val tmpFile = new File(tmpFilePath)
+
+ //atomic swap of tmp and real file if we need to append
+ if (append) {
+ swapFiles(file, tmpFile)
+ }
+
+ try {
+ fileWriter = new FileWriter(tmpFile, append)
+ fileWriter.write(data)
+ } catch {
+ case e: Exception =>
+ error("Error in writing to file %s isAppend %s" format (file, append))
+ System.out.println(e)
+ } finally {
+ fileWriter.close()
+ }
+
+ //atomic swap of tmp and real file
+ swapFiles(tmpFile, file)
+ }
+
+ private def swapFiles(source: File, destination: File) : Unit = {
+ //atomic swap of source and destination file
try {
- Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING)
+ if (source.exists()) {
+ Files.move(source.toPath, destination.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING)
+ }
} catch {
case e: AtomicMoveNotSupportedException =>
- Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING)
+ Files.move(source.toPath, destination.toPath, StandardCopyOption.REPLACE_EXISTING)
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 1bb6648..5f5498d 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -30,6 +30,7 @@ import java.net.InetAddress
import java.net.NetworkInterface
import java.util.Random
+
import scala.collection.JavaConverters._
@@ -64,13 +65,37 @@ object Util extends Logging {
}
}
+ def getSamzaVersion(): String = {
+ Option(this.getClass.getPackage.getImplementationVersion)
+ .getOrElse({
+ warn("Unable to find implementation samza version in jar's meta info. Defaulting to 0.0.1.")
+ "0.0.1"
+ })
+ }
+
+ def getTaskClassVersion(config: Config): String = {
+ try {
+ val taskClass = Option(new ApplicationConfig(config).getAppClass())
+ .orElse(new TaskConfig(config).getTaskClass).get
+ Class.forName(taskClass).getPackage.getImplementationVersion
+ } catch {
+ case e: Exception => {
+ warn("Unable to find implementation version in jar's meta info. Defaulting to 0.0.1.")
+ "0.0.1"
+ }
+ }
+ }
+
/**
* Instantiate an object from given className, and given constructor parameters.
*/
- def getObj[T](className: String, constructorParams: (Class[_], Object)*) : T = {
+ @throws[ClassNotFoundException]
+ @throws[InstantiationException]
+ @throws[InvocationTargetException]
+ def getObj(className: String, constructorParams: (Class[_], Object)*) = {
try {
Class.forName(className).getDeclaredConstructor(constructorParams.map(x => x._1): _*)
- .newInstance(constructorParams.map(x => x._2): _*).asInstanceOf[T]
+ .newInstance(constructorParams.map(x => x._2): _*)
} catch {
case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
warn("Could not instantiate an instance for class %s." format className, e)
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 74ea12a..0400b2e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -20,6 +20,7 @@
package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableMap;
+import java.util.Optional;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -127,8 +128,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
AbstractContainerAllocator allocator =
(AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
@@ -149,8 +149,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
allocator =
(AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
@@ -170,8 +169,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
@@ -214,8 +212,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
taskManager.start();
Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
@@ -240,8 +237,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
@@ -290,8 +286,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
@@ -369,8 +364,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
@@ -415,8 +409,7 @@ public class TestContainerProcessManager {
new MapConfig(config),
state);
- ContainerProcessManager manager = new ContainerProcessManager(config, state, new MetricsRegistryMap(), allocator,
- clusterResourceManager);
+ ContainerProcessManager manager = new ContainerProcessManager(config, state, new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocator));
manager.start();
SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -443,8 +436,7 @@ public class TestContainerProcessManager {
cfg,
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
MockHostAwareContainerAllocator allocator = new MockHostAwareContainerAllocator(
clusterResourceManager,
@@ -510,8 +502,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
@@ -586,8 +577,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
@@ -675,8 +665,7 @@ public class TestContainerProcessManager {
new MapConfig(conf),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
taskManager.start();
SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
assertFalse(taskManager.shouldShutdown());
@@ -689,8 +678,7 @@ public class TestContainerProcessManager {
new MapConfig(config),
state,
new MetricsRegistryMap(),
- clusterResourceManager
- );
+ clusterResourceManager, Optional.empty());
taskManager1.start();
taskManager1.onResourceAllocated(container2);
}
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
index 9f84ce6..b0b65e0 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
-import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.diagnostics.BoundedList;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
@@ -40,17 +40,17 @@ public class TestMetricsSnapshotSerdeV2 {
new MetricsHeader("jobName", "i001", "container 0", "test container ID", "source", "300.14.25.1", "1", "1", 1,
1);
- ListGauge listGauge = new ListGauge<DiagnosticsExceptionEvent>("exceptions");
+ BoundedList boundedList = new BoundedList<DiagnosticsExceptionEvent>("exceptions");
DiagnosticsExceptionEvent diagnosticsExceptionEvent1 =
new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza exception", new RuntimeException("cause")),
new HashMap());
- listGauge.add(diagnosticsExceptionEvent1);
+ boundedList.add(diagnosticsExceptionEvent1);
String samzaContainerMetricsGroupName = "org.apache.samza.container.SamzaContainerMetrics";
Map<String, Map<String, Object>> metricMessage = new HashMap<>();
metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>());
- metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", listGauge.getValues());
+ metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", boundedList.getValues());
metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0);
MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricMessage));
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java b/samza-core/src/test/scala/org/apache/samza/metrics/TestBoundedList.java
similarity index 65%
rename from samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
rename to samza-core/src/test/scala/org/apache/samza/metrics/TestBoundedList.java
index eb91012..698877e 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestBoundedList.java
@@ -21,40 +21,41 @@ package org.apache.samza.metrics;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
+import org.apache.samza.diagnostics.BoundedList;
import org.junit.Assert;
import org.junit.Test;
/**
- * Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge}
+ * Class to encapsulate test-cases for {@link BoundedList}
*/
-public class TestListGauge {
+public class TestBoundedList {
private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10);
- private <T> ListGauge<T> getListGaugeForTest() {
- return new ListGauge<T>("sampleListGauge", 10, Duration.ofSeconds(60));
+ private <T> BoundedList<T> getBoundedListForTest() {
+ return new BoundedList<T>("sampleListGauge", 10, Duration.ofSeconds(60));
}
@Test
public void basicTest() {
- ListGauge<String> listGauge = getListGaugeForTest();
- listGauge.add("sampleValue");
- Assert.assertEquals("Names should be the same", listGauge.getName(), "sampleListGauge");
- Assert.assertEquals("List sizes should match", listGauge.getValues().size(), 1);
- Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValues().contains("sampleValue"), true);
+ BoundedList<String> boundedList = getBoundedListForTest();
+ boundedList.add("sampleValue");
+ Assert.assertEquals("Names should be the same", boundedList.getName(), "sampleListGauge");
+ Assert.assertEquals("List sizes should match", boundedList.getValues().size(), 1);
+ Assert.assertEquals("BoundedList should contain sampleGauge", boundedList.getValues().contains("sampleValue"), true);
}
@Test
public void testSizeEnforcement() {
- ListGauge listGauge = getListGaugeForTest();
+ BoundedList boundedList = getBoundedListForTest();
for (int i = 15; i > 0; i--) {
- listGauge.add("v" + i);
+ boundedList.add("v" + i);
}
- Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValues().size(), 10);
+ Assert.assertEquals("List sizes should be as configured at creation time", boundedList.getValues().size(), 10);
int valueIndex = 10;
- Collection<String> currentList = listGauge.getValues();
+ Collection<String> currentList = boundedList.getValues();
Iterator iterator = currentList.iterator();
while (iterator.hasNext()) {
String gaugeValue = (String) iterator.next();
@@ -65,13 +66,13 @@ public class TestListGauge {
@Test
public void testThreadSafety() throws InterruptedException {
- ListGauge<Integer> listGauge = getListGaugeForTest();
+ BoundedList<Integer> boundedList = getBoundedListForTest();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
- listGauge.add(i);
+ boundedList.add(i);
}
}
});
@@ -80,7 +81,7 @@ public class TestListGauge {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
- listGauge.add(i);
+ boundedList.add(i);
}
}
});
@@ -91,8 +92,8 @@ public class TestListGauge {
thread1.join(THREAD_TEST_TIMEOUT.toMillis());
thread2.join(THREAD_TEST_TIMEOUT.toMillis());
- Assert.assertTrue("ListGauge should have the last 10 values", listGauge.getValues().size() == 10);
- for (Integer gaugeValue : listGauge.getValues()) {
+ Assert.assertTrue("BoundedList should have the last 10 values", boundedList.getValues().size() == 10);
+ for (Integer gaugeValue : boundedList.getValues()) {
Assert.assertTrue("Values should have the last 10 range", gaugeValue <= 100 && gaugeValue > 90);
}
}
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
index 31f0d47..95135b2 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
@@ -21,17 +21,15 @@ package org.apache.samza.logging.log4j;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
-import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
-import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides an in-memory appender that parses LoggingEvents to filter events relevant to diagnostics.
- * Currently, filters exception related events and update an exception metric ({@link ListGauge}) in
- * {@link SamzaContainerMetrics}.
+ * Currently, filters exception related events and updates the {@link DiagnosticsManager}.
*
* When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
* stream of diagnostics-related events.
@@ -41,14 +39,14 @@ public class SimpleDiagnosticsAppender extends AppenderSkeleton {
// simple object to synchronize root logger attachment
private static final Object SYNCHRONIZATION_OBJECT = new Object();
- protected final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;
+ protected final DiagnosticsManager diagnosticsManager;
/**
* A simple log4j1.2.* appender, which attaches itself to the root logger.
* Attachment to the root logger is thread safe.
*/
- public SimpleDiagnosticsAppender(SamzaContainerMetrics samzaContainerMetrics) {
- this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions();
+ public SimpleDiagnosticsAppender(DiagnosticsManager diagnosticsManager) {
+ this.diagnosticsManager = diagnosticsManager;
this.setName(SimpleDiagnosticsAppender.class.getName());
synchronized (SYNCHRONIZATION_OBJECT) {
@@ -74,7 +72,7 @@ public class SimpleDiagnosticsAppender extends AppenderSkeleton {
new DiagnosticsExceptionEvent(loggingEvent.timeStamp, loggingEvent.getThrowableInformation().getThrowable(),
loggingEvent.getProperties());
- samzaContainerExceptionMetric.add(diagnosticsExceptionEvent);
+ diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent);
LOG.debug("Received DiagnosticsExceptionEvent " + diagnosticsExceptionEvent);
} else {
LOG.debug("Received non-exception event with message " + loggingEvent.getMessage());
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java
index 37133db..932b3f7 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/SimpleDiagnosticsAppender.java
@@ -26,16 +26,14 @@ import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
-import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides an in-memory appender that parses LogEvents to filter events relevant to diagnostics.
- * Currently, filters exception related events and update an exception metric ({@link ListGauge}) in
- * {@link SamzaContainerMetrics}.
+ * Currently, filters exception related events and updates the {@link DiagnosticsManager}.
*
* When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
* stream of diagnostics-related events.
@@ -45,11 +43,11 @@ public class SimpleDiagnosticsAppender extends AbstractAppender {
// simple object to synchronize root logger attachment
private static final Object SYNCHRONIZATION_OBJECT = new Object();
- protected final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;
+ protected final DiagnosticsManager diagnosticsManager;
- public SimpleDiagnosticsAppender(SamzaContainerMetrics samzaContainerMetrics) {
+ public SimpleDiagnosticsAppender(DiagnosticsManager diagnosticsManager) {
super(SimpleDiagnosticsAppender.class.getName(), null, null);
- this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions();
+ this.diagnosticsManager = diagnosticsManager;
synchronized (SYNCHRONIZATION_OBJECT) {
attachAppenderToLoggers(this);
@@ -82,7 +80,7 @@ public class SimpleDiagnosticsAppender extends AbstractAppender {
new DiagnosticsExceptionEvent(logEvent.getTimeMillis(), logEvent.getThrown(),
logEvent.getContextData().toMap());
- samzaContainerExceptionMetric.add(diagnosticsExceptionEvent);
+ diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent);
LOG.debug("Received DiagnosticsExceptionEvent " + diagnosticsExceptionEvent);
} else {
LOG.debug("Received non-exception event with message " + logEvent.getMessage());
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
index 4dd4dd8..9a4b8c9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/TestMetricsRegistryImpl.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.Timer;
@@ -36,7 +35,6 @@ public class TestMetricsRegistryImpl implements org.apache.samza.metrics.Metrics
private Map<String, List<Counter>> counters = new HashMap<>();
private Map<String, List<Timer>> timers = new HashMap<>();
private Map<String, List<Gauge<?>>> gauges = new HashMap<>();
- private Map<String, List<ListGauge>> listGauges = new HashMap<>();
@Override
public Counter newCounter(String group, String name) {
@@ -107,11 +105,5 @@ public class TestMetricsRegistryImpl implements org.apache.samza.metrics.Metrics
return gauges;
}
- @Override
- public ListGauge newListGauge(String group, ListGauge listGauge) {
- listGauges.putIfAbsent(group, new ArrayList());
- listGauges.get(group).add(listGauge);
- return listGauge;
- }
}
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index 58adf26..9524145 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -45,7 +45,7 @@ public class YarnConfig extends MapConfig {
/**
* (Optional) JVM options to include in the command line when executing the AM
*/
- public static final String AM_JVM_OPTIONS = "yarn.am.opts";
+ public static final String AM_JVM_OPTIONS = "yClusterBasedJobCoordinator.javaarn.am.opts";
/**
* Determines whether a JMX server should be started on the AM
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 25b6855..56017f1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -46,9 +46,6 @@ object ApplicationMasterRestServlet {
metricsRegistry.getGroup(group).asScala.foreach {
case (name, metric) =>
metric.visit(new MetricsVisitor() {
- def listGauge[T](listGauge: ListGauge[T]) =
- groupMap.put(name, listGauge.getValues)
-
def counter(counter: Counter) =
groupMap.put(counter.getName, counter.getCount: lang.Long)