You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/06/10 13:06:45 UTC

[17/21] SAMZA-7: Rewrite Container section of docs to bring it up-to-date. Reviewed by Jakob Homan.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/api/overview.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/api/overview.md b/docs/learn/documentation/0.7.0/api/overview.md
index 2d03ec3..489ba6d 100644
--- a/docs/learn/documentation/0.7.0/api/overview.md
+++ b/docs/learn/documentation/0.7.0/api/overview.md
@@ -3,100 +3,112 @@ layout: page
 title: API Overview
 ---
 
-When writing a stream processor for Samza, you must implement the StreamTask interface:
-
-```
-/** User processing tasks implement this. */
-public interface StreamTask {
-  void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception;
-}
-```
-
-When Samza runs your task, the process method will be called once for each message that Samza receives from your task's input streams. The envelope contains three things of importance: the message, the key, and the stream that the message came from:
-
-```
-/** This class is given to a StreamTask once for each message that it receives. */
-public class IncomingMessageEnvelope {
-  /** A deserialized message. */
-  Object getMessage() { ... }
-
-  /** A deserialized key. */
-  Object getKey() { ... }
-
-  /** The stream and partition that this message came from. */
-  SystemStreamPartition getSystemStreamPartition() { ... }
-}
-```
-<!-- TODO This description and example needs to be updated to match SystemStreamPartition. -->
-Note that the getSystemStreamPartition() method returns a SystemStreamPartition object, not a String, as you might expect. This is because a Samza Stream actually consists of a name, a system, and a stream. The name is what you call the stream in your Samza configuration file. The system is the name of the cluster that the stream came from (e.g. kafka-aggreate-tracking, databus, etc). The system name is also defined in your Samza configuration file. Lastly, the actual stream is available. For Kafka, this would be the Kafka topic's name.
-
-```
-/** A name/system/stream tuple that represents a Samza stream. */
-public class SystemStreamPartition extends SystemStream {
-
-  /** The system name that this stream is associated with. This is also
-      defined in a Samza job's configuration. */
-  public String getSystem() { ... }
-
-  /** The stream name for the system. */
-  public String getStream() { ... }
-
-  /** The partition within the stream. */
-    public Partition getPartition() { ... }
-}
-```
-
-To make this a bit clearer, let me show you an example. A Samza job's configuration might have:
-
-```
-# the stream
-streams.page-view-event.stream=PageViewEvent
-streams.page-view-event.system=kafka
-streams.page-view-event.serde=json
-
-# the system
-systems.kafka.samza.partition.manager=samza.stream.kafka.KafkaPartitionManager
-systems.kafka.samza.consumer.factory=samza.stream.kafka.KafkaConsumerFactory
-systems.kafka.samza.producer.factory=samza.stream.kafka.KafkaProducerFactory
-...
-```
-
-In this example, getName would return page-view-event, getSystem would return kafka, and getStream would return PageViewEvent. If you've got more than one input stream feeding into your StreamTask, you can use the getStream() object to determine what kind of message you've received.
-
-What about sending messages? If you take a look at the process() method in StreamTask, you'll see that you get a MessageCollector.
-
-```
-/** When a task wishes to send a message, it uses this class. */
-public interface MessageCollector {
-  void send(OutgoingMessageEnvelope envelope);
-}
-```
-
-The collector takes OutgoingMessageEnvelope, which allows tasks to supply a partition key when sending the message. The partition key, if supplied, is used to determine which partition of a stream a message is destined for.
-
-Please only use the MessageCollector object within the process() method. If you hold onto a MessageCollector instance and use it again later, your messages may not be sent correctly.
-
-```
-/** A message envelope that has a key. */
-public class OutgoingMessageEnvelope {
-  ...
-  Object getKey() { ... }
-}
-```
-
-And, putting it all together:
-
-<!-- TODO Verify that this example actually works. -->
-
-```
-class MyStreamerTask extends StreamTask {
-  def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
-    val msg = envelope.getMessage.asInstanceOf[GenericRecord]
-    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg))
-  }
-}
-```
-
-This is a simplistic example that just reads from a stream, and sends the messages to SomeTopicPartitionedByUserId, partitioned by the message's user ID.
-
-## [TaskRunner &raquo;](../container/task-runner.html)
+When writing a stream processor for Samza, you must implement the [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) interface:
+
+    package com.example.samza;
+
+    public class MyTaskClass implements StreamTask {
+
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        // process message
+      }
+    }
+
+When you run your job, Samza will create several instances of your class (potentially on multiple machines). These task instances process the messages in the input streams.
+
+In your job's configuration you can tell Samza which streams you want to consume. An incomplete example could look like this (see the [configuration documentation](../jobs/configuration.html) for more detail):
+
+    # This is the class above, which Samza will instantiate when the job is run
+    task.class=com.example.samza.MyTaskClass
+
+    # Define a system called "kafka" (you can give it any name, and you can define
+    # multiple systems if you want to process messages from different sources)
+    systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+
+    # The job consumes a topic called "PageViewEvent" from the "kafka" system
+    task.inputs=kafka.PageViewEvent
+
+    # Define a serializer/deserializer called "json" which parses JSON messages
+    serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+    # Use the "json" serializer for messages in the "PageViewEvent" topic
+    systems.kafka.streams.PageViewEvent.samza.msg.serde=json
+
+For each message that Samza receives from the task's input streams, the *process* method is called. The [envelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three things of importance: the message, the key, and the stream that the message came from.
+
+    /** Every message that is delivered to a StreamTask is wrapped
+     * in an IncomingMessageEnvelope, which contains metadata about
+     * the origin of the message. */
+    public class IncomingMessageEnvelope {
+      /** A deserialized message. */
+      Object getMessage() { ... }
+
+      /** A deserialized key. */
+      Object getKey() { ... }
+
+      /** The stream and partition that this message came from. */
+      SystemStreamPartition getSystemStreamPartition() { ... }
+    }
+
+The key and value are declared as Object, and need to be cast to the correct type. If you don't configure a [serializer/deserializer](../container/serialization.html), they are typically Java byte arrays. A deserializer can convert these bytes into any other type, for example the JSON deserializer mentioned above parses the byte array into java.util.Map, java.util.List and String objects.
+
+The getSystemStreamPartition() method returns a [SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html) object, which tells you where the message came from. It consists of three parts:
+
+1. The *system*: the name of the system from which the message came, as defined in your job configuration. You can have multiple systems for input and/or output, each with a different name.
+2. The *stream name*: the name of the stream (topic, queue) within the source system. This is also defined in the job configuration.
+3. The [*partition*](javadocs/org/apache/samza/Partition.html): a stream is normally split into several partitions, and each partition is assigned to one StreamTask instance by Samza.
+
+The API looks like this:
+
+    /** A triple of system name, stream name and partition. */
+    public class SystemStreamPartition extends SystemStream {
+
+      /** The name of the system which provides this stream. It is
+          defined in the Samza job's configuration. */
+      public String getSystem() { ... }
+
+      /** The name of the stream/topic/queue within the system. */
+      public String getStream() { ... }
+
+      /** The partition within the stream. */
+      public Partition getPartition() { ... }
+    }
+
+In the example job configuration above, the system name is "kafka", the stream name is "PageViewEvent". (The name "kafka" isn't special &mdash; you can give your system any name you want.) If you have several input streams feeding into your StreamTask, you can use the SystemStreamPartition to determine what kind of message you've received.
+
+What about sending messages? If you take a look at the process() method in StreamTask, you'll see that you get a [MessageCollector](javadocs/org/apache/samza/task/MessageCollector.html).
+
+    /** When a task wishes to send a message, it uses this interface. */
+    public interface MessageCollector {
+      void send(OutgoingMessageEnvelope envelope);
+    }
+
+To send a message, you create an [OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) object and pass it to the message collector. At a minimum, the envelope specifies the message you want to send, and the system and stream name to send it to. Optionally you can specify the partitioning key and other parameters. See the [javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) for details.
+
+**NOTE:** Please only use the MessageCollector object within the process() method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.
+
+For example, here's a simple task that splits each input message into words, and emits each word as a separate message:
+
+    public class SplitStringIntoWords implements StreamTask {
+
+      // Send outgoing messages to a stream called "words"
+      // in the "kafka" system.
+      private final SystemStream OUTPUT_STREAM =
+        new SystemStream("kafka", "words");
+
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        String message = (String) envelope.getMessage();
+
+        for (String word : message.split(" ")) {
+          // Use the word as the key, and 1 as the value.
+          // A second task can add the 1's to get the word count.
+          collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
+        }
+      }
+    }
+
+## [SamzaContainer &raquo;](../container/samza-container.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/comparisons/introduction.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/comparisons/introduction.md b/docs/learn/documentation/0.7.0/comparisons/introduction.md
index 77c04ca..6ecffcf 100644
--- a/docs/learn/documentation/0.7.0/comparisons/introduction.md
+++ b/docs/learn/documentation/0.7.0/comparisons/introduction.md
@@ -43,7 +43,7 @@ example above, where you have a stream of page-view events including the ID of t
 
 Now you can write a Samza job that takes both the page-view event and the changelog as inputs. You make sure that they are partitioned on the same key (e.g. user ID). Every time a changelog event comes in, you write the updated user information to the task's local storage. Every time a page-view event comes in, you read the current information about that user from local storage. That way, you can keep all the state local to a task, and never need to query a remote database.
 
-![Stateful Processing](/img/0.7.0/learn/documentation/introduction/samza_state.png)
+<img src="/img/0.7.0/learn/documentation/introduction/samza_state.png" alt="Stateful Processing" class="diagram-large">
 
 In effect, you now have a replica of the main database, broken into small partitions that are on the same machines as the Samza tasks. Database writes still need to go to the main database, but when you need to read from the database in order to process a message from the input stream, you can just consult the task's local state.
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/comparisons/mupd8.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/comparisons/mupd8.md b/docs/learn/documentation/0.7.0/comparisons/mupd8.md
index 78e7b64..2cc8ee6 100644
--- a/docs/learn/documentation/0.7.0/comparisons/mupd8.md
+++ b/docs/learn/documentation/0.7.0/comparisons/mupd8.md
@@ -57,7 +57,7 @@ This was motivated by our experience with Hadoop, where the data flow between jo
 
 MUPD8 executes all of its map/update processors inside a single JVM, using threads. This is memory-efficient, as the JVM memory overhead is shared across the threads.
 
-Samza uses a separate JVM for each stream processor container ([TaskRunner](../container/task-runner.html)). This has the disadvantage of using more memory compared to running multiple stream processing threads within a single JVM. However, the advantage is improved isolation between tasks, which can make them more reliable.
+Samza uses a separate JVM for each [stream processor container](../container/samza-container.html). This has the disadvantage of using more memory compared to running multiple stream processing threads within a single JVM. However, the advantage is improved isolation between tasks, which can make them more reliable.
 
 ### Isolation
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/checkpointing.md b/docs/learn/documentation/0.7.0/container/checkpointing.md
index 42b2e8d..6a93d84 100644
--- a/docs/learn/documentation/0.7.0/container/checkpointing.md
+++ b/docs/learn/documentation/0.7.0/container/checkpointing.md
@@ -3,47 +3,79 @@ layout: page
 title: Checkpointing
 ---
 
-On the [Streams](streams.html) page, on important detail was glossed over. When a TaskRunner instantiates a SystemConsumer for an input stream/partition pair, how does the TaskRunner know where in the stream to start reading messages. If you recall, Kafka has the concept of an offset, which defines a specific location in a topic/partition pair. The idea is that an offset can be used to reference a specific point in a stream/partition pair. When you read messages from Kafka, you can supply an offset to specify at which point you'd like to read from. After you read, you increment your offset, and get the next message.
+Samza provides fault-tolerant processing of streams: Samza guarantees that messages won't be lost, even if your job crashes, if a machine dies, if there is a network fault, or something else goes wrong. In order to provide this guarantee, Samza expects the [input system](streams.html) to meet the following requirements:
 
-![diagram](/img/0.7.0/learn/documentation/container/checkpointing.png)
+* The stream may be sharded into one or more *partitions*. Each partition is independent from the others, and is replicated across multiple machines (the stream continues to be available, even if a machine fails).
+* Each partition consists of a sequence of messages in a fixed order. Each message has an *offset*, which indicates its position in that sequence. Messages are always consumed sequentially within each partition.
+* A Samza job can start consuming the sequence of messages from any starting offset.
 
-This diagram looks the same as on the [Streams](streams.html) page, except that there are black lines at different points in each input stream/partition pair. These lines represent the current offset for each stream consumer. As the stream consumer reads, the offset increases, and moves closer to the "head" of the stream. The diagram also illustrates that the offsets might be staggered, such that some offsets are farther along in their stream/partition than others.
+Kafka meets these requirements, but they can also be implemented with other message broker systems.
 
-If a SystemConsumer is reading messages for a TaskRunner, and the TaskRunner stops for some reason (due to hardware failure, re-deployment, or whatever), the SystemConsumer should start where it left off when the TaskRunner starts back up again. We're able to do this because the Kafka broker is buffering messages on a remote server (the broker). Since the messages are available when we come back, we can just start from our last offset, and continue moving forward, without losing data.
+As described in the [section on SamzaContainer](samza-container.html), each task instance of your job consumes one partition of an input stream. Each task has a *current offset* for each input stream: the offset of the next message to be read from that stream partition. Every time a message is read from the stream, the current offset moves forwards.
 
-The TaskRunner supports this ability using something called a CheckpointManager.
+If a Samza container fails, it needs to be restarted (potentially on another machine) and resume processing where the failed container left off. In order to enable this, a container periodically checkpoints the current offset for each task instance.
 
-```
-public interface CheckpointManager {
-  void start();
+<img src="/img/0.7.0/learn/documentation/container/checkpointing.svg" alt="Illustration of checkpointing" class="diagram-large">
 
-  void register(Partition partition);
+When a Samza container starts up, it looks for the most recent checkpoint and starts consuming messages from the checkpointed offsets. If the previous container failed unexpectedly, the most recent checkpoint may be slightly behind the current offsets (i.e. the job may have consumed some more messages since the last checkpoint was written), but we can't know for sure. In that case, the job may process a few messages again.
 
-  void writeCheckpoint(Partition partition, Checkpoint checkpoint);
+This guarantee is called *at-least-once processing*: Samza ensures that your job doesn't miss any messages, even if containers need to be restarted. However, it is possible for your job to see the same message more than once when a container is restarted. We are planning to address this in a future version of Samza, but for now it is just something to be aware of: for example, if you are counting page views, a forcefully killed container could cause events to be slightly over-counted. You can reduce duplication by checkpointing more frequently, at a slight performance cost.
 
-  Checkpoint readLastCheckpoint(Partition partition);
+For checkpoints to be effective, they need to be written somewhere where they will survive faults. Samza allows you to write checkpoints to the file system (using FileSystemCheckpointManager), but that doesn't help if the machine fails and the container needs to be restarted on another machine. The most common configuration is to use Kafka for checkpointing. You can enable this with the following job configuration:
 
-  void stop();
-}
+    # The name of your job determines the name under which checkpoints will be stored
+    job.name=example-job
 
-public class Checkpoint {
-  private final Map<SystemStream, String> offsets;
-  ...
-}
-```
+    # Define a system called "kafka" for consuming and producing to a Kafka cluster
+    systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 
-As you can see, the checkpoint manager provides a way to write out checkpoints for a given partition. Right now, the checkpoint contains a map. The map's keys are input stream names, and the map's values are each input stream's offset. Each checkpoint is managed per-partition. For example, if you have page-view-event and service-metric-event defined as streams in your Samza job's configuration file, the TaskRunner would supply a checkpoint with two keys in each checkpoint offset map (one for page-view-event and the other for service-metric-event).
+    # Declare that we want our job's checkpoints to be written to Kafka
+    task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+    task.checkpoint.system=kafka
 
-Samza provides two checkpoint managers: FileSystemCheckpointManager and KafkaCheckpointManager. The KafkaCheckpointManager is what you generally want to use. The way that KafkaCheckpointManager works is as follows: it writes checkpoint messages for your Samza job to a special Kafka topic. This topic's name is \_\_samza\_checkpoint\_your-job-name. For example, if you had a Samza job called "my-first-job", the Kafka topic would be called \_\_samza\_checkpoint\_my-first-job. This Kafka topic is partitioned identically to your Samza job's partition count. If your Samza job has 10 partitions, the checkpoint topic for your Samza job will also have 10 partitions. Every time that the TaskRunner calls writeCheckpoint, a checkpoint message will be sent to the partition that corresponds with the partition for the checkpoint that the TaskRunner wishes to write.
+    # By default, a checkpoint is written every 60 seconds. You can change this if you like.
+    task.commit.ms=60000
 
-![diagram](/img/0.7.0/learn/documentation/container/checkpointing-2.png)
+In this configuration, Samza writes checkpoints to a separate Kafka topic called \_\_samza\_checkpoint\_&lt;job-name&gt;\_&lt;job-id&gt; (in the example configuration above, the topic would be called \_\_samza\_checkpoint\_example-job\_1). Once per minute, Samza automatically sends a message to this topic, in which the current offsets of the input streams are encoded. When a Samza container starts up, it looks for the most recent offset message in this topic, and loads that checkpoint.
 
-When the TaskRunner starts for the first time, the offset behavior of the SystemConsumers is undefined. If the system for the SystemConsumer is Kafka, we fall back to the auto.offset.reset setting. If the auto.offset.reset is set to "largest", we start reading messages from the head of the stream; if it's set to "smallest", we read from the tail. If it's undefined, the TaskRunner will fail.
+Sometimes it can be useful to use checkpoints only for some input streams, but not for others. In this case, you can tell Samza to ignore any checkpointed offsets for a particular stream name:
 
-The TaskRunner calls writeCheckpoint at a windowed interval (e.g. every 10 seconds). If the TaskRunner fails, and restarts, it simply calls readLastCheckpoint for each partition. In the case of the KafkaCheckpointManager, this readLastCheckpoint method will read the last message that was written to the checkpoint topic for each partition in the job. One edge case to consider is that SystemConsumers might have read messages from an offset that hasn't yet been checkpointed. In such a case, when the TaskRunner reads the last checkpoint for each partition, the offsets might be farther back in the stream. When this happens, your StreamTask could get duplicate messages (i.e. it saw message X, failed, restarted at an offset prior to message X, and then reads message X again). Thus, Samza currently provides at least once messaging. You might get duplicates. Caveat emptor.
+    # Ignore any checkpoints for the topic "my-special-topic"
+    systems.kafka.streams.my-special-topic.samza.reset.offset=true
 
-<!-- TODO Add a link to the fault tolerance SEP when one exists -->
+    # Always start consuming "my-special-topic" at the oldest available offset
+    systems.kafka.streams.my-special-topic.samza.offset.default=oldest
 
-*Note that there are design proposals in the works to give exactly once messaging.*
+The following table explains the meaning of these configuration parameters:
+
+<table class="documentation">
+  <tr>
+    <th>Parameter name</th>
+    <th>Value</th>
+    <th>Meaning</th>
+  </tr>
+  <tr>
+    <td rowspan="2" class="nowrap">systems.&lt;system&gt;.<br>streams.&lt;stream&gt;.<br>samza.reset.offset</td>
+    <td>false (default)</td>
+    <td>When container starts up, resume processing from last checkpoint</td>
+  </tr>
+  <tr>
+    <td>true</td>
+    <td>Ignore checkpoint (pretend that no checkpoint is present)</td>
+  </tr>
+  <tr>
+    <td rowspan="2" class="nowrap">systems.&lt;system&gt;.<br>streams.&lt;stream&gt;.<br>samza.offset.default</td>
+    <td>upcoming (default)</td>
+    <td>When container starts and there is no checkpoint (or the checkpoint is ignored), only process messages that are published after the job is started, but no old messages</td>
+  </tr>
+  <tr>
+    <td>oldest</td>
+    <td>When container starts and there is no checkpoint (or the checkpoint is ignored), jump back to the oldest available message in the system, and consume all messages from that point onwards (most likely this means repeated processing of messages already seen previously)</td>
+  </tr>
+</table>
+
+Note that the example configuration above causes your tasks to start consuming from the oldest offset *every time a container starts up*. This is useful in case you have some in-memory state in your tasks that you need to rebuild from source data in an input stream. If you are using streams in this way, you may also find [bootstrap streams](streams.html) useful.
+
+If you want to make a one-off change to a job's consumer offsets, for example to force old messages to be processed again with a new version of your code, you can use CheckpointTool to manipulate the job's checkpoint. The tool is included in Samza's [source repository](/contribute/code.html) and documented in the README.
 
 ## [State Management &raquo;](state-management.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/event-loop.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/event-loop.md b/docs/learn/documentation/0.7.0/container/event-loop.md
index 1f9c51e..903ef90 100644
--- a/docs/learn/documentation/0.7.0/container/event-loop.md
+++ b/docs/learn/documentation/0.7.0/container/event-loop.md
@@ -3,87 +3,40 @@ layout: page
 title: Event Loop
 ---
 
-The event loop is the [TaskRunner](task-runner.html)'s single thread that is in charge of [reading](streams.html), [writing](streams.html), [metrics flushing](metrics.html), [checkpointing](checkpointing.html), and [windowing](windowing.html). It's the code that puts all of this stuff together. Each SystemConsumer reads messages on its own thread, but writes messages into a centralized message queue. The TaskRunner uses this queue to funnel all of the messages into the event loop. Here's how the event loop works:
+The event loop is the [container](samza-container.html)'s single thread that is in charge of [reading and writing messages](streams.html), [flushing metrics](metrics.html), [checkpointing](checkpointing.html), and [windowing](windowing.html).
 
-1. Take a message from the incoming message queue (the queue that the SystemConsumers are putting their messages)
-2. Give the message to the appropriate StreamTask by calling process() on it
-3. Call window() on the StreamTask if it implements WindowableTask, and the window time has expired
-4. Send any StreamTask output from the process() and window() call to the appropriate SystemProducers
-5. Write checkpoints for any partitions that are past the defined checkpoint commit interval
+Samza uses a single thread because every container is designed to use a single CPU core; to get more parallelism, simply run more containers. This uses a bit more memory than multithreaded parallelism, because each JVM has some overhead, but it simplifies resource management and improves isolation between jobs. This helps Samza jobs run reliably on a multitenant cluster, where many different jobs written by different people are running at the same time.
 
-The TaskRunner does this, in a loop, until it is shutdown.
+You are strongly discouraged from using threads in your job's code. Samza uses multiple threads internally for communicating with input and output streams, but all message processing and user code runs on a single-threaded event loop. In general, Samza is not thread-safe.
 
-### Lifecycle Listeners
-
-Sometimes, it's useful to receive notifications when a specific event happens in the TaskRunner. For example, you might want to reset some context in the container whenever a new message arrives. To accomplish this, Samza provides a TaskLifecycleListener interface, that can be wired into the TaskRunner through configuration.
-
-```
-/**
- * Used to get before/after notifications before initializing/closing all tasks
- * in a given container (JVM/process).
- */
-public interface TaskLifecycleListener {
-  /**
-   * Called before all tasks in TaskRunner are initialized.
-   */
-  void beforeInit(Config config, TaskContext context);
-
-  /**
-   * Called after all tasks in TaskRunner are initialized.
-   */
-  void afterInit(Config config, TaskContext context);
-
-  /**
-   * Called before a message is processed by a task.
-   */
-  void beforeProcess(IncomingMessageEnvelope envelope, Config config, TaskContext context);
+### Event Loop Internals
 
-  /**
-   * Called after a message is processed by a task.
-   */
-  void afterProcess(IncomingMessageEnvelope envelope, Config config, TaskContext context);
+A container may have multiple [SystemConsumers](../api/javadocs/org/apache/samza/system/SystemConsumer.html) for consuming messages from different input systems. Each SystemConsumer reads messages on its own thread, but writes messages into a shared in-process message queue. The container uses this queue to funnel all of the messages into the event loop.
 
-  /**
-   * Called before all tasks in TaskRunner are closed.
-   */
-  void beforeClose(Config config, TaskContext context);
+The event loop works as follows:
 
-  /**
-   * Called after all tasks in TaskRunner are closed.
-   */
-  void afterClose(Config config, TaskContext context);
-}
-```
+1. Take a message from the incoming message queue;
+2. Give the message to the appropriate [task instance](samza-container.html) by calling process() on it;
+3. Call window() on the task instance if it implements [WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html), and the window time has expired;
+4. Send any output from the process() and window() calls to the appropriate [SystemProducers](../api/javadocs/org/apache/samza/system/SystemProducer.html);
+5. Write checkpoints for any tasks whose [commit interval](checkpointing.html) has elapsed.
 
-To use a TaskLifecycleListener, you must also create a factory for the listener.
+The container does this, in a loop, until it is shut down. Note that although there can be multiple task instances within a container (depending on the number of input stream partitions), their process() and window() methods are all called on the same thread, never concurrently on different threads.
 
-```
-public interface TaskLifecycleListenerFactory {
-  TaskLifecycleListener getLifecyleListener(String name, Config config);
-}
-```
-
-#### Configuring Lifecycle Listeners
-
-Once you have written a TaskLifecycleListener, and its factory, you can use the listener by configuring your Samza job with the following keys:
-
-* task.lifecycle.listeners: A CSV list of all defined listeners that should be used for the Samza job.
-* task.lifecycle.listener.&lt;listener name&gt;.class: A Java package and class name for a single listener factory.
+### Lifecycle Listeners
 
-For example, you might define a listener called "my-listener":
+Sometimes, you need to run your own code at specific points in a task's lifecycle. For example, you might want to set up some context in the container whenever a new message arrives, or perform some operations on startup or shutdown.
 
-```
-task.lifecycle.listener.my-listener.class=com.foo.bar.MyListenerFactory
-```
+To receive notifications when such events happen, you can implement the [TaskLifecycleListenerFactory](../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html) interface. It returns a [TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html), whose methods are called by Samza at the appropriate times.
 
-And then enable it for your Samza job:
+You can then tell Samza to use your lifecycle listener with the following properties in your job configuration:
 
-```
-task.lifecycle.listeners=my-listener
-```
+    # Define a listener called "my-listener" by giving the factory class name
+    task.lifecycle.listener.my-listener.class=com.example.foo.MyListenerFactory
 
-Samza's container will create one instance of TaskLifecycleListener, and notify it whenever any of the events (shown in the API above) occur.
+    # Enable it in this job (multiple listeners can be separated by commas)
+    task.lifecycle.listeners=my-listener
 
-Borrowing from the example above, if we have a single Samza container processing partitions 0 and 2, and have defined a lifecycle listener called my-listener, then the Samza container will have a single instance of MyListener. MyListener's beforeInit, afterInit, beforeClose, and afterClose methods will all be called twice: one for each of the two partitions (e.g. beforeInit partition 0, before init partition 1, etc). The beforeProcess and afterProcess methods will simply be called once for each incoming message. The TaskContext is how the TaskLifecycleListener is able to tell which partition the event is for.
+The Samza container creates one instance of your [TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html). If the container has multiple task instances (processing different input stream partitions), the beforeInit, afterInit, beforeClose and afterClose methods are called for each task instance. The [TaskContext](../api/javadocs/org/apache/samza/task/TaskContext.html) argument of those methods gives you more information about the partitions.
 
 ## [JMX &raquo;](jmx.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/index.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/index.md b/docs/learn/documentation/0.7.0/container/index.md
deleted file mode 100644
index 17751de..0000000
--- a/docs/learn/documentation/0.7.0/container/index.md
+++ /dev/null
@@ -1,18 +0,0 @@
----
-layout: page
-title: Container
----
-
-The API section shows how a Samza StreamTask is written. To execute a StreamTask, Samza has a container that wraps around your StreamTask. The Samza container manages:
-
-* Metrics
-* Configuration
-* Lifecycle
-* Checkpointing
-* State management
-* Serialization
-* Data transport
-
-This container is called a TaskRunner. Read on to learn more about Samza's TaskRunner.
-
-## [JobRunner &raquo;](job-runner.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/jmx.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/jmx.md b/docs/learn/documentation/0.7.0/container/jmx.md
index a9fcc77..9fce867 100644
--- a/docs/learn/documentation/0.7.0/container/jmx.md
+++ b/docs/learn/documentation/0.7.0/container/jmx.md
@@ -3,11 +3,20 @@ layout: page
 title: JMX
 ---
 
-The Samza TaskRunner (and YARN Application Master) will turn on JMX using a randomly selected port, since Samza is meant to be run in a distributed environment, and it's unknown which ports will be available prior to runtime. The port will be output in the TaskRunner's logs with a line like this:
+Samza's containers and YARN ApplicationMaster enable [JMX](http://docs.oracle.com/javase/tutorial/jmx/) by default. JMX can be used for managing the JVM; for example, you can connect to it using [jconsole](http://docs.oracle.com/javase/7/docs/technotes/guides/management/jconsole.html), which is included in the JDK.
 
-    2013-07-05 20:42:36 JmxServer [INFO] According to InetAddress.getLocalHost.getHostName we are Chriss-MacBook-Pro.local
-    2013-07-05 20:42:36 JmxServer [INFO] Started JmxServer port=64905 url=service:jmx:rmi:///jndi/rmi://Chriss-MacBook-Pro.local:64905/jmxrmi
+You can tell Samza to publish its internal [metrics](metrics.html), and any custom metrics you define, as JMX MBeans. To enable this, set the following properties in your job configuration:
 
-Any metrics that are registered in the TaskRunner will be visible through JMX. To toggle JMX, see the [Configuration](../jobs/configuration.html) section.
+    # Define a Samza metrics reporter called "jmx", which publishes to JMX
+    metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
+    # Use it (if you have multiple reporters defined, separate them with commas)
+    metrics.reporters=jmx
+
+JMX needs to be configured to use a specific port, but in a distributed environment, there is no way of knowing in advance which ports are available on the machines running your containers. Therefore Samza chooses the JMX port randomly. If you need to connect to it, you can find the port by looking in the container's logs, which report the JMX server details as follows:
+
+    2014-06-02 21:50:17 JmxServer [INFO] According to InetAddress.getLocalHost.getHostName we are samza-grid-1234.example.com
+    2014-06-02 21:50:17 JmxServer [INFO] Started JmxServer registry port=50214 server port=50215 url=service:jmx:rmi://localhost:50215/jndi/rmi://localhost:50214/jmxrmi
+    2014-06-02 21:50:17 JmxServer [INFO] If you are tunneling, you might want to try JmxServer registry port=50214 server port=50215 url=service:jmx:rmi://samza-grid-1234.example.com:50215/jndi/rmi://samza-grid-1234.example.com:50214/jmxrmi
 
 ## [JobRunner &raquo;](../jobs/job-runner.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/metrics.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/metrics.md b/docs/learn/documentation/0.7.0/container/metrics.md
index 078ce47..98acd81 100644
--- a/docs/learn/documentation/0.7.0/container/metrics.md
+++ b/docs/learn/documentation/0.7.0/container/metrics.md
@@ -3,52 +3,78 @@ layout: page
 title: Metrics
 ---
 
-Samza also provides a metrics library that the TaskRunner uses. It allows a StreamTask to create counters and gauges. The TaskRunner then writes those metrics to metrics infrastructure through a MetricsReporter implementation.
-
-```
-public class MyJavaStreamerTask implements StreamTask, InitableTask {
-  private static final Counter messageCount;
-
-  public void init(Config config, TaskContext context) {
-    this.messageCount = context.getMetricsRegistry().newCounter(MyJavaStreamerTask.class.toString(), "MessageCount");
-  }
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
-    System.out.println(envelope.getMessage().toString());
-    messageCount.inc();
-  }
-}
-```
-
-Samza's metrics design is very similar to Coda Hale's [metrics](https://github.com/codahale/metrics) library. It has two important interfaces:
-
-```
-public interface MetricsRegistry {
-  Counter newCounter(String group, String name);
-
-  <T> Gauge<T> newGauge(String group, String name, T value);
-}
-
-public interface MetricsReporter {
-  void start();
-
-  void register(String source, ReadableMetricsRegistry registry);
-
-  void stop();
-}
-```
-
-### MetricsRegistry
-
-When the TaskRunner starts up, as with StreamTask instantiation, it creates a MetricsRegistry for every partition in the Samza job.
-
-![diagram](/img/0.7.0/learn/documentation/container/metrics.png)
-
-The TaskRunner, itself, also gets a MetricsRegistry that it can use to create counters and gauges. It uses this registry to measure a lot of relevant metrics for itself.
-
-### MetricsReporter
-
-The other important interface is the MetricsReporter. The TaskRunner uses MetricsReporter implementations to send its MetricsRegistry counters and gauges to whatever metrics infrastructure the reporter uses. A Samza job's configuration determines which MetricsReporters the TaskRunner will use. Out of the box, Samza comes with a MetricsSnapshotReporter that sends JSON metrics messages to a Kafka topic, and a JmxReporter that records metrics to be read via JMX.
+When you're running a stream process in production, it's important that you have good metrics to track the health of your job. In order to make this easy, Samza includes a metrics library. It is used by Samza itself to generate some standard metrics such as message throughput, but you can also use it in your task code to emit custom metrics.
+
+Metrics can be reported in various ways. You can expose them via [JMX](jmx.html), which is useful in development. In production, a common setup is for each Samza container to periodically publish its metrics to a "metrics" Kafka topic, in which the metrics from all Samza jobs are aggregated. You can then consume this stream in another Samza job, and send the metrics to your favorite graphing system such as [Graphite](http://graphite.wikidot.com/).
+
+To set up your job to publish metrics to Kafka, you can use the following configuration:
+
+    # Define a metrics reporter called "snapshot", which publishes metrics
+    # every 60 seconds.
+    metrics.reporters=snapshot
+    metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+
+    # Tell the snapshot reporter to publish to a topic called "metrics"
+    # in the "kafka" system.
+    metrics.reporter.snapshot.stream=kafka.metrics
+
+    # Encode metrics data as JSON.
+    serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
+    systems.kafka.streams.metrics.samza.msg.serde=metrics
+
+With this configuration, the job automatically sends several JSON-encoded messages to the "metrics" topic in Kafka every 60 seconds. The messages look something like this:
+
+    {
+      "header": {
+        "container-name": "samza-container-0",
+        "host": "samza-grid-1234.example.com",
+        "job-id": "1",
+        "job-name": "my-samza-job",
+        "reset-time": 1401729000347,
+        "samza-version": "0.0.1",
+        "source": "Partition-2",
+        "time": 1401729420566,
+        "version": "0.0.1"
+      },
+      "metrics": {
+        "org.apache.samza.container.TaskInstanceMetrics": {
+          "commit-calls": 7,
+          "commit-skipped": 77948,
+          "kafka-input-topic-offset": "1606",
+          "messages-sent": 985,
+          "process-calls": 1093,
+          "send-calls": 985,
+          "send-skipped": 76970,
+          "window-calls": 0,
+          "window-skipped": 77955
+        }
+      }
+    }
+
+There is a separate message for each task instance, and the header tells you the job name, job ID and partition of the task. The metrics allow you to see how many messages have been processed and sent, the current offset in the input stream partition, and other details. There are additional messages which give you metrics about the JVM (heap size, garbage collection information, threads etc.), internal metrics of the Kafka producers and consumers, and more.
+
+It's easy to generate custom metrics in your job, if there's some value you want to keep an eye on. You can use Samza's built-in metrics framework, which is similar in design to Coda Hale's [metrics](http://metrics.codahale.com/) library. 
+
+You can register your custom metrics through a [MetricsRegistry](../api/javadocs/org/apache/samza/metrics/MetricsRegistry.html). Your stream task needs to implement [InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html), so that you can get the metrics registry from the [TaskContext](../api/javadocs/org/apache/samza/task/TaskContext.html). This simple example shows how to count the number of messages processed by your task:
+
+    public class MyJavaStreamTask implements StreamTask, InitableTask {
+      private Counter messageCount;
+
+      public void init(Config config, TaskContext context) {
+        this.messageCount = context
+          .getMetricsRegistry()
+          .newCounter(getClass().getName(), "message-count");
+      }
+
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        messageCount.inc();
+      }
+    }
+
+Samza currently supports two kind of metrics: [counters](../api/javadocs/org/apache/samza/metrics/Counter.html) and [gauges](../api/javadocs/org/apache/samza/metrics/Gauge.html). Use a counter when you want to track how often something occurs, and a gauge when you want to report the level of something, such as the size of a buffer. Each task instance (for each input stream partition) gets its own set of metrics.
+
+If you want to report metrics in some other way, e.g. directly to a graphing system (without going via Kafka), you can implement a [MetricsReporterFactory](../api/javadocs/org/apache/samza/metrics/MetricsReporterFactory.html) and reference it in your job configuration.
 
 ## [Windowing &raquo;](windowing.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/samza-container.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/samza-container.md b/docs/learn/documentation/0.7.0/container/samza-container.md
new file mode 100644
index 0000000..5d259c4
--- /dev/null
+++ b/docs/learn/documentation/0.7.0/container/samza-container.md
@@ -0,0 +1,66 @@
+---
+layout: page
+title: SamzaContainer
+---
+
+The SamzaContainer is responsible for managing the startup, execution, and shutdown of one or more [StreamTask](../api/overview.html) instances. Each SamzaContainer typically runs as an indepentent Java virtual machine. A Samza job can consist of several SamzaContainers, potentially running on different machines.
+
+When a SamzaContainer starts up, it does the following:
+
+1. Get last checkpointed offset for each input stream partition that it consumes
+2. Create a "reader" thread for every input stream partition that it consumes
+3. Start metrics reporters to report metrics
+4. Start a checkpoint timer to save your task's input stream offsets every so often
+5. Start a window timer to trigger your task's [window method](../api/javadocs/org/apache/samza/task/WindowableTask.html), if it is defined
+6. Instantiate and initialize your StreamTask once for each input stream partition
+7. Start an event loop that takes messages from the input stream reader threads, and gives them to your StreamTasks
+8. Notify lifecycle listeners during each one of these steps
+
+Let's start in the middle, with the instantiation of a StreamTask. The following sections of the documentation cover the other steps.
+
+### Tasks and Partitions
+
+When the container starts, it creates instances of the [task class](../api/overview.html) that you've written. If the task class implements the [InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html) interface, the SamzaContainer will also call the init() method.
+
+    /** Implement this if you want a callback when your task starts up. */
+    public interface InitableTask {
+      void init(Config config, TaskContext context);
+    }
+
+How many instances of your task class are created depends on the number of partitions in the job's input streams. If your Samza job has ten partitions, there will be ten instantiations of your task class: one for each partition. The first task instance will receive all messages for partition one, the second instance will receive all messages for partition two, and so on.
+
+<img src="/img/0.7.0/learn/documentation/container/tasks-and-partitions.svg" alt="Illustration of tasks consuming partitions" class="diagram-large">
+
+The number of partitions in the input streams is determined by the systems from which you are consuming. For example, if your input system is Kafka, you can specify the number of partitions when you create a topic.
+
+If a Samza job has more than one input stream, the number of task instances for the Samza job is the maximum number of partitions across all input streams. For example, if a Samza job is reading from PageViewEvent (12 partitions), and ServiceMetricEvent (14 partitions), then the Samza job would have 14 task instances (numbered 0 through 13). Task instances 12 and 13 only receive events from ServiceMetricEvent, because there is no corresponding PageViewEvent partition.
+
+There is [work underway](https://issues.apache.org/jira/browse/SAMZA-71) to make the assignment of partitions to tasks more flexible in future versions of Samza.
+
+### Containers and resource allocation
+
+Although the number of task instances is fixed &mdash; determined by the number of input partitions &mdash; you can configure how many containers you want to use for your job. If you are [using YARN](../jobs/yarn-jobs.html), the number of containers determines what CPU and memory resources are allocated to your job.
+
+If the data volume on your input streams is small, it might be sufficient to use just one SamzaContainer. In that case, Samza still creates one task instance per input partition, but all those tasks run within the same container. At the other extreme, you can create as many containers as you have partitions, and Samza will assign one task instance to each container.
+
+Each SamzaContainer is designed to use one CPU core, so it uses a [single-threaded event loop](event-loop.html) for execution. It's not advisable to create your own threads within a SamzaContainer. If you need more parallelism, please configure your job to use more containers.
+
+Any [state](state-management.html) in your job belongs to a task instance, not to a container. This is a key design decision for Samza's scalability: as your job's resource requirements grow and shrink, you can simply increase or decrease the number of containers, but the number of task instances remains unchanged. As you scale up or down, the same state remains attached to each task instance. Task instances may be moved from one container to another, and any persistent state managed by Samza will be moved with it. This allows the job's processing semantics to remain unchanged, even as you change the job's parallelism.
+
+### Joining multiple input streams
+
+If your job has multiple input streams, Samza provides a simple but powerful mechanism for joining data from different streams: each task instance receives messages from one partition of *each* of the input streams. For example, say you have two input streams, A and B, each with four partitions. Samza creates four task instances to process them, and assigns the partitions as follows:
+
+<table class="documentation">
+<tr><th>Task instance</th><th>Consumes stream partitions</th></tr>
+<tr><td>0</td><td>stream A partition 0, stream B partition 0</td></tr>
+<tr><td>1</td><td>stream A partition 1, stream B partition 1</td></tr>
+<tr><td>2</td><td>stream A partition 2, stream B partition 2</td></tr>
+<tr><td>3</td><td>stream A partition 3, stream B partition 3</td></tr>
+</table>
+
+Thus, if you want two events in different streams to be processed by the same task instance, you need to ensure they are sent to the same partition number. You can achieve this by using the same partitioning key when [sending the messages](../api/overview.html). Joining streams is discussed in detail in the [state management](state-management.html) section.
+
+There is one caveat in all of this: Samza currently assumes that a stream's partition count will never change. Partition splitting or repartitioning is not supported. If an input stream has N partitions, it is expected that it has always had, and will always have N partitions. If you want to re-partition a stream, you can write a job that reads messages from the stream, and writes them out to a new stream with the required number of partitions. For example, you could read messages from PageViewEvent, and write them to PageViewEventRepartition.
+
+## [Streams &raquo;](streams.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/serialization.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/serialization.md b/docs/learn/documentation/0.7.0/container/serialization.md
new file mode 100644
index 0000000..f6570c9
--- /dev/null
+++ b/docs/learn/documentation/0.7.0/container/serialization.md
@@ -0,0 +1,46 @@
+---
+layout: page
+title: Serialization
+---
+
+Every message that is read from or written to a [stream](streams.html) or a [persistent state store](state-management.html) needs to eventually be serialized to bytes (which are sent over the network or written to disk). There are various places where that serialization and deserialization can happen:
+
+1. In the client library: for example, the library for publishing to Kafka and consuming from Kafka supports pluggable serialization.
+2. In the task implementation: your [process method](../api/overview.html) can use raw byte arrays as inputs and outputs, and do any parsing and serialization itself.
+3. Between the two: Samza provides a layer of serializers and deserializers, or *serdes* for short.
+
+You can use whatever makes sense for your job; Samza doesn't impose any particular data model or serialization scheme on you. However, the cleanest solution is usually to use Samza's serde layer. The following configuration example shows how to use it.
+
+    # Define a system called "kafka"
+    systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+
+    # The job is going to consume a topic called "PageViewEvent" from the "kafka" system
+    task.inputs=kafka.PageViewEvent
+
+    # Define a serde called "json" which parses/serializes JSON objects
+    serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+    # Define a serde called "integer" which encodes an integer as 4 binary bytes (big-endian)
+    serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+    # For messages in the "PageViewEvent" topic, the key (the ID of the user viewing the page)
+    # is encoded as a binary integer, and the message is encoded as JSON.
+    systems.kafka.streams.PageViewEvent.samza.key.serde=integer
+    systems.kafka.streams.PageViewEvent.samza.msg.serde=json
+
+    # Define a key-value store which stores the most recent page view for each user ID.
+    # Again, the key is an integer user ID, and the value is JSON.
+    stores.LastPageViewPerUser.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+    stores.LastPageViewPerUser.changelog=kafka.last-page-view-per-user
+    stores.LastPageViewPerUser.key.serde=integer
+    stores.LastPageViewPerUser.msg.serde=json
+
+Each serde is defined with a factory class. Samza comes with several builtin serdes for UTF-8 strings, binary-encoded integers, JSON (requires the samza-serializers dependency) and more. You can also create your own serializer by implementing the [SerdeFactory](../api/javadocs/org/apache/samza/serializers/SerdeFactory.html) interface.
+
+The name you give to a serde (such as "json" and "integer" in the example above) is only for convenience in your job configuration; you can choose whatever name you like. For each stream and each state store, you can use the serde name to declare how messages should be serialized and deserialized.
+
+If you don't declare a serde, Samza simply passes objects through between your task instance and the system stream. In that case your task needs to send and receive whatever type of object the underlying client library uses.
+
+All the Samza APIs for sending and receiving messages are typed as *Object*. This means that you have to cast messages to the correct type before you can use them. It's a little bit more code, but it has the advantage that Samza is not restricted to any particular data model.
+
+## [Checkpointing &raquo;](checkpointing.html)