You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/06/19 02:59:36 UTC
git commit: SAMZA-35: Write Javadocs for all samza-api interfaces
Repository: incubator-samza
Updated Branches:
refs/heads/master 8c6002e58 -> 83c1cf439
SAMZA-35: Write Javadocs for all samza-api interfaces
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/83c1cf43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/83c1cf43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/83c1cf43
Branch: refs/heads/master
Commit: 83c1cf4399df2d2c698cd2fee8b3e5d07a94864d
Parents: 8c6002e
Author: Jakob Homan <jg...@apache.org>
Authored: Wed Jun 18 17:59:25 2014 -0700
Committer: Jakob Homan <jg...@apache.org>
Committed: Wed Jun 18 17:59:25 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/samza/Partition.java | 2 +-
.../org/apache/samza/checkpoint/Checkpoint.java | 4 +-
.../samza/checkpoint/CheckpointManager.java | 3 +-
.../checkpoint/CheckpointManagerFactory.java | 3 ++
.../java/org/apache/samza/config/Config.java | 3 ++
.../apache/samza/config/ConfigException.java | 3 ++
.../org/apache/samza/config/ConfigFactory.java | 9 ++++
.../org/apache/samza/config/ConfigRewriter.java | 3 +-
.../java/org/apache/samza/config/MapConfig.java | 3 ++
.../samza/container/SamzaContainerContext.java | 3 ++
.../org/apache/samza/job/ApplicationStatus.java | 3 ++
.../org/apache/samza/job/CommandBuilder.java | 4 ++
.../java/org/apache/samza/job/StreamJob.java | 32 +++++++++++++
.../org/apache/samza/job/StreamJobFactory.java | 3 ++
.../java/org/apache/samza/metrics/Counter.java | 3 +-
.../java/org/apache/samza/metrics/Gauge.java | 7 +++
.../apache/samza/metrics/MetricsRegistry.java | 36 ++++++++++++++
.../apache/samza/metrics/MetricsReporter.java | 4 ++
.../samza/metrics/MetricsReporterFactory.java | 3 ++
.../org/apache/samza/metrics/MetricsType.java | 34 -------------
.../apache/samza/metrics/MetricsVisitor.java | 4 +-
.../samza/metrics/ReadableMetricsRegistry.java | 4 ++
.../apache/samza/serializers/Deserializer.java | 4 +-
.../org/apache/samza/serializers/Serde.java | 7 +++
.../apache/samza/serializers/SerdeFactory.java | 4 ++
.../apache/samza/serializers/Serializer.java | 3 +-
.../org/apache/samza/storage/StorageEngine.java | 9 +++-
.../samza/storage/StorageEngineFactory.java | 7 +--
.../samza/system/OutgoingMessageEnvelope.java | 5 +-
.../org/apache/samza/system/SystemAdmin.java | 2 +-
.../org/apache/samza/system/SystemConsumer.java | 2 +-
.../org/apache/samza/system/SystemFactory.java | 4 ++
.../org/apache/samza/system/SystemProducer.java | 18 ++++++-
.../org/apache/samza/system/SystemStream.java | 6 ++-
.../samza/system/SystemStreamPartition.java | 2 +-
.../system/SystemStreamPartitionIterator.java | 4 ++
.../samza/system/chooser/MessageChooser.java | 50 ++++++++++----------
.../system/chooser/MessageChooserFactory.java | 3 ++
.../org/apache/samza/task/ClosableTask.java | 6 +++
.../java/org/apache/samza/task/StreamTask.java | 14 +++++-
.../java/org/apache/samza/task/TaskContext.java | 4 ++
.../org/apache/samza/task/TaskCoordinator.java | 9 ++++
.../task/TaskLifecycleListenerFactory.java | 3 ++
.../org/apache/samza/task/WindowableTask.java | 6 ++-
.../apache/samza/util/BlockingEnvelopeMap.java | 4 +-
.../main/java/org/apache/samza/util/Clock.java | 3 ++
.../apache/samza/util/NoOpMetricsRegistry.java | 4 ++
.../system/chooser/BootstrappingChooser.scala | 10 ++--
.../system/kafka/KafkaSystemProducer.scala | 5 --
49 files changed, 278 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/Partition.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/Partition.java b/samza-api/src/main/java/org/apache/samza/Partition.java
index ebb77ed..66d517d 100644
--- a/samza-api/src/main/java/org/apache/samza/Partition.java
+++ b/samza-api/src/main/java/org/apache/samza/Partition.java
@@ -20,7 +20,7 @@
package org.apache.samza;
/**
- * Used to represent a Samza stream partition.
+ * A numbered, ordered partition of a stream.
*/
public class Partition {
private final int partition;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
index dcf81bf..6fad1fa 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -25,7 +25,9 @@ import java.util.Map;
import org.apache.samza.system.SystemStream;
/**
- * Used to represent a checkpoint in the running of a Samza system.
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
*/
public class Checkpoint {
private final Map<SystemStream, String> offsets;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 34f50fd..a6e1ba6 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -22,7 +22,8 @@ package org.apache.samza.checkpoint;
import org.apache.samza.Partition;
/**
- * Used as a standard interface for writing out checkpoints for a specified partition.
+ * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
+ * implementation-specific location.
*/
public interface CheckpointManager {
public void start();
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
index 5ce8f35..a97ff09 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
@@ -22,6 +22,9 @@ package org.apache.samza.checkpoint;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
+/**
+ * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
+ */
public interface CheckpointManagerFactory {
public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/config/Config.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java
index c42c1c5..2048e90 100644
--- a/samza-api/src/main/java/org/apache/samza/config/Config.java
+++ b/samza-api/src/main/java/org/apache/samza/config/Config.java
@@ -27,6 +27,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * Store and retrieve named, typed values as configuration for classes implementing this interface.
+ */
public abstract class Config implements Map<String, String> {
public Config subset(String prefix) {
return subset(prefix, true);
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/config/ConfigException.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/ConfigException.java b/samza-api/src/main/java/org/apache/samza/config/ConfigException.java
index b6ab549..7619d11 100644
--- a/samza-api/src/main/java/org/apache/samza/config/ConfigException.java
+++ b/samza-api/src/main/java/org/apache/samza/config/ConfigException.java
@@ -21,6 +21,9 @@ package org.apache.samza.config;
import org.apache.samza.SamzaException;
+/**
+ * Specific {@link org.apache.samza.SamzaException}s thrown from {@link org.apache.samza.config.Config}
+ */
public class ConfigException extends SamzaException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/config/ConfigFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/ConfigFactory.java b/samza-api/src/main/java/org/apache/samza/config/ConfigFactory.java
index d6d7584..8230f0e 100644
--- a/samza-api/src/main/java/org/apache/samza/config/ConfigFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/config/ConfigFactory.java
@@ -21,6 +21,15 @@ package org.apache.samza.config;
import java.net.URI;
+/**
+ * Build a {@link org.apache.samza.config.Config}
+ */
public interface ConfigFactory {
+
+ /**
+ * Build a specific Config.
+ * @param configUri Resource containing information necessary for this Config.
+ * @return Newly constructed Config.
+ */
Config getConfig(URI configUri);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/config/ConfigRewriter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/ConfigRewriter.java b/samza-api/src/main/java/org/apache/samza/config/ConfigRewriter.java
index 7248e8b..720126a 100644
--- a/samza-api/src/main/java/org/apache/samza/config/ConfigRewriter.java
+++ b/samza-api/src/main/java/org/apache/samza/config/ConfigRewriter.java
@@ -20,7 +20,8 @@
package org.apache.samza.config;
/**
- * Re-write the job's config before the job is submitted.
+ * A ConfigRewriter receives the job's config during job startup and may re-write it to provide new configs,
+ * remove existing configs or audit and verify the config is correct or permitted.
*/
public interface ConfigRewriter {
Config rewrite(String name, Config config);
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
index 337e921..1a83923 100644
--- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
@@ -26,6 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+/**
+ * A {@link org.apache.samza.config.Config} backed by a Java {@link java.util.Map}
+ */
public class MapConfig extends Config {
private final Map<String, String> map;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index 5aa7a8f..78d56a9 100644
--- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -24,6 +24,9 @@ import java.util.Collection;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
+/**
+ * A SamzaContainerContext maintains per-container information for the tasks it executes.
+ */
public class SamzaContainerContext {
public final String name;
public final Config config;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java b/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java
index 49052af..c41430a 100644
--- a/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java
+++ b/samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java
@@ -19,6 +19,9 @@
package org.apache.samza.job;
+/**
+ * Status of a {@link org.apache.samza.job.StreamJob} during and after its run.
+ */
public enum ApplicationStatus {
Running("Running"), SuccessfulFinish("SuccessfulFinish"), UnsuccessfulFinish("UnsuccessfulFinish"), New("New");
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
index 5ec6433..cb40092 100644
--- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
@@ -25,6 +25,10 @@ import org.apache.samza.system.SystemStreamPartition;
import java.util.Map;
import java.util.Set;
+/**
+ * CommandBuilders are used to customize the command necessary to launch a Samza Job for a particular framework,
+ * such as YARN or the LocalJobRunner.
+ */
public abstract class CommandBuilder {
protected Set<SystemStreamPartition> systemStreamPartitions;
protected String name;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/job/StreamJob.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/StreamJob.java b/samza-api/src/main/java/org/apache/samza/job/StreamJob.java
index f519949..d8bc84f 100644
--- a/samza-api/src/main/java/org/apache/samza/job/StreamJob.java
+++ b/samza-api/src/main/java/org/apache/samza/job/StreamJob.java
@@ -19,14 +19,46 @@
package org.apache.samza.job;
+/**
+ * A StreamJob runs Samza {@link org.apache.samza.task.StreamTask}s in its specific environment.
+ * Users generally do not need to implement a StreamJob themselves, rather it is a framework-level
+ * interface meant for those extending Samza itself. This class, and its accompanying factory,
+ * allow Samza to run on other service providers besides YARN and LocalJob, such as Mesos or Sun Grid Engine.
+ */
public interface StreamJob {
+ /**
+ * Submit this job to be run.
+ * @return An instance of this job after it has been submitted.
+ */
StreamJob submit();
+ /**
+ * Kill this job immediately.
+ *
+ * @return An instance of this job after it has been killed.
+ */
StreamJob kill();
+ /**
+ * Block on this job until either it finishes or reaches its timeout value
+ *
+ * @param timeoutMs How many milliseconds to wait before returning, assuming the job has not yet finished
+ * @return {@link org.apache.samza.job.ApplicationStatus} of the job after finishing or timing out
+ */
ApplicationStatus waitForFinish(long timeoutMs);
+ /**
+ * Block on this job until either it transitions to the specified status or reaches it timeout value
+ *
+ * @param status Target {@link org.apache.samza.job.ApplicationStatus} to wait upon
+ * @param timeoutMs How many milliseconds to wait before returning, assuming the job has not transitioned to the specified value
+ * @return {@link org.apache.samza.job.ApplicationStatus} of the job after finishing or reaching target state
+ */
ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs);
+ /**
+ * Get current {@link org.apache.samza.job.ApplicationStatus} of the job
+ * @return Current job status
+ */
ApplicationStatus getStatus();
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/job/StreamJobFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/StreamJobFactory.java b/samza-api/src/main/java/org/apache/samza/job/StreamJobFactory.java
index 4cdcc2c..2e8b159 100644
--- a/samza-api/src/main/java/org/apache/samza/job/StreamJobFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/job/StreamJobFactory.java
@@ -21,6 +21,9 @@ package org.apache.samza.job;
import org.apache.samza.config.Config;
+/**
+ * Build a {@link org.apache.samza.job.StreamJob}
+ */
public interface StreamJobFactory {
StreamJob getJob(Config config);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Counter.java b/samza-api/src/main/java/org/apache/samza/metrics/Counter.java
index 0838df3..6a85c66 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Counter.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Counter.java
@@ -22,7 +22,8 @@ package org.apache.samza.metrics;
import java.util.concurrent.atomic.AtomicLong;
/**
- * A counter is a metric that represents a cumulative value.
+ * A counter is a {@link org.apache.samza.metrics.Metric} that represents a cumulative value.
+ * For example, the number of messages processed since the container was started.
*/
public class Counter implements Metric {
private final String name;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
index 3335c15..c37bfbb 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
@@ -21,6 +21,13 @@ package org.apache.samza.metrics;
import java.util.concurrent.atomic.AtomicReference;
+/**
+ * A Gauge is a {@link org.apache.samza.metrics.Metric} that wraps some instance of T in a thread-safe
+ * reference and allows it to be set or retrieved. Gauages record specific values over time.
+ * For example, the current length of a queue or the size of a buffer.
+ *
+ * @param <T> Instance to be wrapped in the gauge for metering.
+ */
public class Gauge<T> implements Metric {
private final String name;
private AtomicReference<T> ref;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index 9df1ef6..1031e45 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -19,12 +19,48 @@
package org.apache.samza.metrics;
+/**
+ * A MetricsRegistry allows its users to create new {@link org.apache.samza.metrics.Metric}s and
+ * have those metrics wired to specific metrics systems, such as JMX, provided by {@link org.apache.samza.metrics.MetricsReporter}s.
+ * Those implementing Samza jobs use the MetricsRegistry to register metrics, which then handle
+ * the details of getting those metrics to each defined MetricsReporter.
+ *
+ * Users are free to define their metrics into groups as needed for their jobs. {@link org.apache.samza.metrics.MetricsReporter}s
+ * will likely use the group field to group the user-defined metrics together.
+ */
public interface MetricsRegistry {
+ /**
+ * Create and register a new {@link org.apache.samza.metrics.Counter}
+ * @param group Group for this Counter
+ * @param name Name of to-be-created Counter
+ * @return New Counter instance
+ */
Counter newCounter(String group, String name);
+ /**
+ * Register existing {@link org.apache.samza.metrics.Counter} with this registry
+ * @param group Group for this Counter
+ * @param counter Existing Counter to register
+ * @return Counter that was registered
+ */
Counter newCounter(String group, Counter counter);
+ /**
+ * Create and register a new {@link org.apache.samza.metrics.Gauge}
+ * @param group Group for this Gauge
+ * @param name Name of to-be-created Gauge
+ * @param value Initial value for the Gauge
+ * @param <T> Type the Gauge will be wrapping
+ * @return Gauge was created and registered
+ */
<T> Gauge<T> newGauge(String group, String name, T value);
+ /**
+ * Register an existing {@link org.apache.samza.metrics.Gauge}
+ * @param group Group for this Gauge
+ * @param value Initial value for the Gauge
+ * @param <T> Type the Gauge will be wrapping
+ * @return Gauge was registered
+ */
<T> Gauge<T> newGauge(String group, Gauge<T> value);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporter.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporter.java
index d52dfa9..f28746c 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporter.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporter.java
@@ -19,6 +19,10 @@
package org.apache.samza.metrics;
+/**
+ * A MetricsReporter is the interface that different metrics sinks, such as JMX, implement to receive
+ * metrics from the Samza framework and Samza jobs.
+ */
public interface MetricsReporter {
void start();
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
index 19eb91c..7807222 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
@@ -21,6 +21,9 @@ package org.apache.samza.metrics;
import org.apache.samza.config.Config;
+/**
+ * Build a {@link org.apache.samza.metrics.MetricsReporter}
+ */
public interface MetricsReporterFactory {
MetricsReporter getMetricsReporter(String name, String containerName, Config config);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/MetricsType.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsType.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsType.java
deleted file mode 100644
index e79d4e6..0000000
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsType.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.metrics;
-
-public enum MetricsType {
- MetricsCounter("MetricsCounter"), MetricsGauge("MetricsGauge");
-
- private final String str;
-
- private MetricsType(String str) {
- this.str = str;
- }
-
- public String toString() {
- return str;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
index fee0883..f4f756a 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
@@ -20,7 +20,9 @@
package org.apache.samza.metrics;
/**
- * A metric visitor visits a metric, before metrics are flushed to a metrics stream.
+ * A MetricsVisitor can be used to process each metric in a {@link org.apache.samza.metrics.ReadableMetricsRegistry},
+ * encapsulating the logic of what to be done with each metric in the counter and gauge methods. This makes it easy
+ * to quickly process all of the metrics in a registry.
*/
public abstract class MetricsVisitor {
public abstract void counter(Counter counter);
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistry.java
index ebea426..b495e2a 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistry.java
@@ -22,6 +22,10 @@ package org.apache.samza.metrics;
import java.util.Map;
import java.util.Set;
+/**
+ * A ReadableMetricsRegistry is a {@link org.apache.samza.metrics.MetricsRegistry} that also
+ * allows read access to the metrics for which it is responsible.
+ */
public interface ReadableMetricsRegistry extends MetricsRegistry {
Set<String> getGroups();
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java b/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java
index fe72223..a55b46a 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java
@@ -22,7 +22,9 @@ package org.apache.samza.serializers;
/**
* A standard interface for Samza compatible deserializers, used for deserializing serialized objects back to their
* original form.
- * @param <T> The type of serialized object this deserializer should be implemented to deserialize.
+ *
+ * @param <T> The type of serialized object implementations can read
+
*/
public interface Deserializer<T> {
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/Serde.java b/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
index fab1055..a59b8c2 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
@@ -19,5 +19,12 @@
package org.apache.samza.serializers;
+/**
+ * A Serde is a convenience type that implements both the {@link org.apache.samza.serializers.Serializer} and
+ * {@link org.apache.samza.serializers.Deserializer} interfaces, allowing it to both read and write data
+ * in its value type, T.
+ *
+ * @param <T> The type of serialized object implementations can both read and write
+ */
public interface Serde<T> extends Serializer<T>, Deserializer<T> {
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/serializers/SerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/SerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/SerdeFactory.java
index a41a922..d09defb 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/SerdeFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/SerdeFactory.java
@@ -21,6 +21,10 @@ package org.apache.samza.serializers;
import org.apache.samza.config.Config;
+/**
+ * Build an instance of {@link org.apache.samza.serializers.Serde}
+ * @param <T> The type of serialized object this factory's output can both read and write
+ */
public interface SerdeFactory<T> {
Serde<T> getSerde(String name, Config config);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java b/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java
index 932e9a5..8388090 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/Serializer.java
@@ -21,7 +21,8 @@ package org.apache.samza.serializers;
/**
* A standard interface for Samza compatible serializers, used for serializing objects to bytes.
- * @param <T> The type of object this serializer should be implemented to serialize.
+ *
+ * @param <T> The type of serialized object implementations can write
*/
public interface Serializer<T> {
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 96dec9b..747ee2b 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -26,14 +26,19 @@ import org.apache.samza.system.IncomingMessageEnvelope;
/**
* A storage engine for managing state maintained by a stream processor.
*
- * This interface does not specify any query capabilities, which, of course,
+ * <p>This interface does not specify any query capabilities, which, of course,
* would be query engine specific. Instead it just specifies the minimum
* functionality required to reload a storage engine from its changelog as well
* as basic lifecycle management.
*/
public interface StorageEngine {
- // TODO javadocs for StorageEngine.init
+ /**
+ * Restore the content of this StorageEngine from the changelog. Messages are provided
+ * in one {@link java.util.Iterator} and not deserialized for efficiency, allowing the
+ * implementation to optimize replay, if possible.
+ * @param envelopes
+ */
void restore(Iterator<IncomingMessageEnvelope> envelopes);
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
index da57bf0..963ccf2 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
@@ -33,15 +33,16 @@ import org.apache.samza.task.MessageCollector;
*/
public interface StorageEngineFactory<K, V> {
- // TODO add Javadocs for MetricsRegistry and MessageCollector args
-
/**
* Create an instance of the given storage engine.
+ *
* @param storeName The name of the storage engine.
* @param storeDir The directory of the storage engine.
* @param keySerde The serializer to use for serializing keys when reading or writing to the store.
* @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
- * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog from.
+ * @param collector MessageCollector the storage engine uses to persist changes.
+ * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+ * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
* @param containerContext Information about the container in which the task is executing.
* @return The storage engine instance.
*/
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java
index c8ef980..d32d9df 100644
--- a/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java
@@ -20,8 +20,9 @@
package org.apache.samza.system;
/**
- * This class represents a message envelope that is sent by a StreamTask. It can be thought of as a complement to the
- * IncomingMessageEnvelope class.
+ * An OutgoingMessageEnvelope is sent to a specified {@link SystemStream} via the appropriate {@link org.apache.samza.system.SystemProducer}
+ * from the user's {@link org.apache.samza.task.StreamTask}. StreamTasks consume from their input streams via their
+ * process method and write to their output streams by sending OutgoingMessageEnvelopes via the provided {@link org.apache.samza.task.MessageCollector}
*/
public class OutgoingMessageEnvelope {
private final SystemStream systemStream;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 3976253..571c606 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -23,7 +23,7 @@ import java.util.Map;
import java.util.Set;
/**
- * An interface that's use to interact with the underlying system to fetch
+ * Helper interface attached to an underlying system to fetch
* information about streams, partitions, offsets, etc. This interface is useful
* for providing utility methods that Samza needs in order to interact with a
* system.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
index a92e301..591f8fb 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
@@ -137,7 +137,7 @@ public interface SystemConsumer {
* Poll the SystemConsumer to get any available messages from the underlying
* system.
*
- * If the underlying implementation does not take care to adhere to the
+ * <p>If the underlying implementation does not take care to adhere to the
* timeout parameter, the SamzaContainer's performance will suffer
* drastically. Specifically, if poll blocks when it's not supposed to, it
* will block the entire main thread in SamzaContainer, and no messages will
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java b/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
index ae33e8e..a1aae37 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
@@ -22,6 +22,10 @@ package org.apache.samza.system;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
+/**
+ * Build the {@link org.apache.samza.system.SystemConsumer} and {@link org.apache.samza.system.SystemProducer} for
+ * a particular system, as well as the accompanying {@link org.apache.samza.system.SystemAdmin}.
+ */
public interface SystemFactory {
SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry);
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java b/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java
index 8967f57..9ce3032 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java
@@ -20,11 +20,21 @@
package org.apache.samza.system;
/**
- * Used as a standard interface for all producers of messages from a specified Samza source.
+ * SystemProducers are how Samza writes messages from {@link org.apache.samza.task.StreamTask}s to outside systems,
+ * such as messaging systems like Kafka, or file systems. Implementations are responsible for accepting messages
+ * and writing them to their backing systems.
*/
public interface SystemProducer {
+
+ /**
+ * Start the SystemProducer. After this method finishes it should be ready to accept messages received from the send method.
+ */
void start();
+ /**
+ * Stop the SystemProducer. After this method finished, the system should have completed all necessary work, sent
+ * any remaining messages and will not receive any new calls to the send method.
+ */
void stop();
/**
@@ -40,5 +50,11 @@ public interface SystemProducer {
*/
void send(String source, OutgoingMessageEnvelope envelope);
+ /**
+ * If the SystemProducer buffers messages before sending them to its underlying system, it should flush those
+ * messages and leave no messages remaining to be sent.
+ *
+ * @param source String representing the source of the message.
+ */
void flush(String source);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/SystemStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStream.java b/samza-api/src/main/java/org/apache/samza/system/SystemStream.java
index 0265a2c..7f30e90 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStream.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStream.java
@@ -20,7 +20,11 @@
package org.apache.samza.system;
/**
- * Used to represent a Samza stream.
+ * Streams in Samza consist of both the stream name and the system to which the stream belongs.
+ * Systems are defined through the job config and have corresponding serdes, producers and
+ * consumers in order to deserialize, send to and retrieve from them. A stream name is dependent
+ * on its system, and may be the topic, queue name, file name, etc. as makes sense for a
+ * particular system.
*/
public class SystemStream {
protected final String system;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
index 5173ebd..bb69c38 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
@@ -22,7 +22,7 @@ package org.apache.samza.system;
import org.apache.samza.Partition;
/**
- * Aggregate object representing a partition of a Samza stream.
+ * Aggregate object representing a both the {@link org.apache.samza.system.SystemStream} and {@link org.apache.samza.Partition}.
*/
public class SystemStreamPartition extends SystemStream {
protected final Partition partition;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
index 62a5eb7..9acfb10 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
@@ -29,6 +29,10 @@ import java.util.Queue;
import org.apache.samza.SamzaException;
+/**
+ * {@link java.util.Iterator} that wraps a {@link org.apache.samza.system.SystemConsumer} to iterate over
+ * the messages the consumer provides for the specified {@link org.apache.samza.system.SystemStreamPartition}.
+ */
public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEnvelope> {
private final SystemConsumer systemConsumer;
private final Map<SystemStreamPartition, Integer> fetchMap;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
index 6d2fa23..a8fe2b8 100644
--- a/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
+++ b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
@@ -25,46 +25,46 @@ import org.apache.samza.system.SystemStreamPartition;
/**
* MessageChooser is an interface for programmatic fine-grain control over
* stream consumption.
- *
- * Consider the case of a Samza task is consuming multiple streams where some
+ *
+ * <p>Consider the case of a Samza task consuming multiple streams, where some
* streams may be from live systems that have stricter SLA requirements and must
* always be prioritized over other streams that may be from batch systems.
* MessageChooser allows developers to inject message prioritization logic into
* the SamzaContainer.
- *
- * In general, the MessageChooser can be used to prioritize certain systems,
+ *
+ * <p>In general, the MessageChooser can be used to prioritize certain systems,
* streams or partitions over others. It can also be used to throttle certain
- * partitions if it chooses not to return messages even though they are
- * available when choose is invoked. The MessageChooser can also throttle the
- * entire SamzaContainer by performing a blocking operation, such as
- * Thread.sleep.
- *
- * The manner in which MessageChooser is used is:
- *
+ * partitions, by choosing not to return messages even though they are
+ * available. The MessageChooser can also throttle the entire SamzaContainer by
+ * performing a blocking operation, such as Thread.sleep.
+ *
+ * <p>The manner in which MessageChooser is used is:
+ *
* <ul>
- * <li>SystemConsumers buffers messages from all SystemStreamPartitions as they
+ * <li>SystemConsumers buffer messages from all SystemStreamPartitions as they
* become available.</li>
* <li>If MessageChooser has no messages for a given SystemStreamPartition, and
- * SystemConsumers has a message in its buffer for the SystemStreamPartition,
+ * a SystemConsumer has a message in its buffer for the SystemStreamPartition,
* the MessageChooser will be updated once with the next message in the buffer.</li>
* <li>When SamzaContainer is ready to process another message, it calls
- * SystemConsumers.choose, which in-turn calls MessageChooser.choose.</li>
+ * SystemConsumers.choose, which in turn calls {@link MessageChooser#choose}.</li>
* </ul>
- *
- * Since the MessageChooser only receives one message at a time per
- * SystemStreamPartition, it can be used to order messages between different
+ *
+ * <p>Since the MessageChooser only receives one message at a time per
+ * {@link SystemStreamPartition}, it can be used to order messages between different
* SystemStreamPartitions, but it can't be used to re-order messages within a
* single SystemStreamPartition (a buffered sort). This must be done within a
* StreamTask.
- *
- * The contract between the MessageChooser and the SystemConsumers is:
- *
+ *
+ * <p>The contract between the MessageChooser and the SystemConsumers is:
+ *
* <ul>
- * <li>Update can be called multiple times before choose is called.</li>
- * <li>A null return from MessageChooser.choose means no envelopes should be
+ * <li>{@link #update(IncomingMessageEnvelope)} can be called multiple times
+ * before {@link #choose()} is called.</li>
+ * <li>If {@link #choose()} returns null, that means no envelopes should be
* processed at the moment.</li>
- * <li>A MessageChooser may elect to return null when choose is called, even if
- * unprocessed messages have been given by the update method.</li>
+ * <li>A MessageChooser may elect to return null when {@link #choose()} is
+ * called, even if unprocessed messages have been given by the update method.</li>
* <li>A MessageChooser will not have any of its in-memory state restored in the
* event of a failure.</li>
* <li>Blocking operations (such as Thread.sleep) will block all processing in
@@ -104,7 +104,7 @@ public interface MessageChooser {
void register(SystemStreamPartition systemStreamPartition, String offset);
/**
- * Notify the chooser that a new envelope is available for a processing.A
+ * Notify the chooser that a new envelope is available for a processing. A
* MessageChooser will receive, at most, one outstanding envelope per
* system/stream/partition combination. For example, if update is called for
* partition 7 of kafka.mystream, then update will not be called with an
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
index 6442db9..1c3d3de 100644
--- a/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
@@ -22,6 +22,9 @@ package org.apache.samza.system.chooser;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
+/**
+ * Build an instance of a {@link org.apache.samza.system.chooser.MessageChooser}
+ */
public interface MessageChooserFactory {
MessageChooser getChooser(Config config, MetricsRegistry registry);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java b/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
index a93cca0..36003b3 100644
--- a/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
+++ b/samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
@@ -19,6 +19,12 @@
package org.apache.samza.task;
+/**
+ * A ClosableTask augments {@link org.apache.samza.task.StreamTask}, allowing the method implementer to specify
+ * code that will be called when the StreamTask is being shut down by the framework, providing to emit final metrics,
+ * clean or close resources, etc. The close method is not guaranteed to be called in event of crash or hard kill
+ * of the process.
+ */
public interface ClosableTask {
void close() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/task/StreamTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/StreamTask.java b/samza-api/src/main/java/org/apache/samza/task/StreamTask.java
index 00d5efd..ed1d387 100644
--- a/samza-api/src/main/java/org/apache/samza/task/StreamTask.java
+++ b/samza-api/src/main/java/org/apache/samza/task/StreamTask.java
@@ -22,8 +22,18 @@ package org.apache.samza.task;
import org.apache.samza.system.IncomingMessageEnvelope;
/**
- * Used as a standard interface for all user processing tasks. Receives messages from a partition of a specified input
- * stream.
+ * A StreamTask is the basic class on which Samza jobs are implemented. Developers writing Samza jobs begin by
+ * implementing this class, which processes messages from the job's input streams and writes messages out to
+ * streams via the provided {@link org.apache.samza.task.MessageCollector}. A StreamTask may be augmented by
+ * implementing other interfaces, such as {@link org.apache.samza.task.InitableTask}, {@link org.apache.samza.task.WindowableTask},
+ * or {@link org.apache.samza.task.ClosableTask}.
+ * <p>
+ * The methods of StreamTasks and associated other tasks are guaranteed to be called in a single-threaded fashion;
+ * no extra synchronization is necessary on the part of the class implementer. References to instances of
+ * {@link org.apache.samza.system.IncomingMessageEnvelope}s,{@link org.apache.samza.task.MessageCollector}s, and
+ * {@link org.apache.samza.task.TaskCoordinator} should not be held onto between calls; there is no guarantee that
+ * these will not be invalidated or otherwise used by the framework.
+ *
*/
public interface StreamTask {
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 611507e..7c1b085 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -22,6 +22,10 @@ package org.apache.samza.task;
import org.apache.samza.Partition;
import org.apache.samza.metrics.MetricsRegistry;
+/**
+ * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during
+ * initialization in an {@link org.apache.samza.task.InitableTask} and during calls to {@link org.apache.samza.task.TaskLifecycleListener}s.
+ */
public interface TaskContext {
MetricsRegistry getMetricsRegistry();
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
index 5049b1b..6ff1a55 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
@@ -19,6 +19,15 @@
package org.apache.samza.task;
+/**
+ * TaskCoordinators are provided to the process methods of {@link org.apache.samza.task.StreamTask} implementations
+ * to allow the user code to request actions from the Samza framework, including committing the current checkpoints
+ * to configured {@link org.apache.samza.checkpoint.CheckpointManager}s or shutting down the task or all tasks within
+ * a container.
+ * <p>
+ * This interface may evolve over time.
+ * </p>
+ */
public interface TaskCoordinator {
/**
* Requests that Samza should write out a checkpoint, from which a task can restart
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
index 31f32bc..5ed7054 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
@@ -21,6 +21,9 @@ package org.apache.samza.task;
import org.apache.samza.config.Config;
+/**
+ * Build a {@link org.apache.samza.task.TaskLifecycleListener}
+ */
public interface TaskLifecycleListenerFactory {
TaskLifecycleListener getLifecyleListener(String name, Config config);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/task/WindowableTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/WindowableTask.java b/samza-api/src/main/java/org/apache/samza/task/WindowableTask.java
index 1f48eec..62f4e39 100644
--- a/samza-api/src/main/java/org/apache/samza/task/WindowableTask.java
+++ b/samza-api/src/main/java/org/apache/samza/task/WindowableTask.java
@@ -20,7 +20,10 @@
package org.apache.samza.task;
/**
- * Used as a standard interface to allow user processing tasks to operate on specified time intervals, or "windows".
+ * Add-on interface to {@link org.apache.samza.task.StreamTask} implementations to add code which will be run on
+ * a specified time interval (via configuration). This can be used to implement direct time-based windowing or,
+ * with a frequent window interval, windowing based on some other condition which is checked during the call to
+ * window. The window method will be called even if no messages are received for a particular StreamTask.
*/
public interface WindowableTask {
/**
@@ -28,6 +31,7 @@ public interface WindowableTask {
* @param collector Contains the means of sending message envelopes to the output stream. The collector must only
* be used during the current call to the window method; you should not reuse the collector between invocations
* of this method.
+ *
* @param coordinator Manages execution of tasks.
* @throws Exception Any exception types encountered during the execution of the processing task.
*/
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 7171088..9503739 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -39,7 +39,9 @@ import org.apache.samza.system.SystemStreamPartition;
* BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
* Samza's poll() requirements make implementing SystemConsumers somewhat
* tricky. BlockingEnvelopeMap is provided to help other developers write
- * SystemConsumers.
+ * SystemConsumers. The intended audience is not those writing Samza jobs,
+ * but rather those extending Samza to consume from new types of stream providers
+ * and other systems.
* </p>
*
* <p>
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/util/Clock.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/Clock.java b/samza-api/src/main/java/org/apache/samza/util/Clock.java
index e1a77e6..db92114 100644
--- a/samza-api/src/main/java/org/apache/samza/util/Clock.java
+++ b/samza-api/src/main/java/org/apache/samza/util/Clock.java
@@ -19,6 +19,9 @@
package org.apache.samza.util;
+/**
+ * Mockable interface for tracking time.
+ */
public interface Clock {
long currentTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index 8bc0764..d7bc4a9 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -23,6 +23,10 @@ import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
+/**
+ * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
+ * recorded but a registry is still required.
+ */
public class NoOpMetricsRegistry implements MetricsRegistry {
@Override
public Counter newCounter(String group, String name) {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index 91c1813..ad21a08 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -202,11 +202,11 @@ class BootstrappingChooser(
* offset 8 is chosen, not when the message with offset 10 is chosen.
*
* @param systemStreamPartition The SystemStreamPartition to check.
- * @param offset The offset to check.
- * @param newestOrUpcoming Whether to check the offset against the newest or
- * upcoming offset for the SystemStreamPartition.
- * Upcoming is useful during the registration phase,
- * and newest is useful during the choosing phase.
+ * @param offset The offset of the most recently chosen message.
+ * @param offsetType Whether to check the offset against the newest or
+ * upcoming offset for the SystemStreamPartition.
+ * Upcoming is useful during the registration phase,
+ * and newest is useful during the choosing phase.
*/
private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
val systemStream = systemStreamPartition.getSystemStream
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/83c1cf43/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 5fb7a20..c5fe462 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -19,15 +19,10 @@
package org.apache.samza.system.kafka
-import java.nio.ByteBuffer
-import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import grizzled.slf4j.Logging
import kafka.producer.KeyedMessage
import kafka.producer.Producer
-import kafka.producer.ProducerConfig
-import org.apache.samza.config.Config
-import org.apache.samza.util.KafkaUtil
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.util.ExponentialSleepStrategy