You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/29 04:57:48 UTC

[1/4] samza git commit: SAMZA-465: Use coordinator stream and eliminate CheckpointManager

Repository: samza
Updated Branches:
  refs/heads/master c37d75270 -> 23fb2e1c0


http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/perf/container-performance.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/container-performance.properties b/samza-test/src/main/config/perf/container-performance.properties
index 86b2d58..7dcab02 100644
--- a/samza-test/src/main/config/perf/container-performance.properties
+++ b/samza-test/src/main/config/perf/container-performance.properties
@@ -28,5 +28,15 @@ task.opts=-Xmx3072m -XX:+UseConcMarkSweepGC
 task.class=org.apache.samza.test.performance.TestPerformanceTask
 task.inputs=mock.stream0,mock.stream1,mock.stream2,mock.stream3,mock.stream4,mock.stream5,mock.stream6,mock.stream7,mock.stream8,mock.stream9,mock.stream10,mock.stream11,mock.stream12,mock.stream13,mock.stream14,mock.stream15,mock.stream16,mock.stream17,mock.stream18,mock.stream19,mock.stream20,mock.stream21,mock.stream22,mock.stream23,mock.stream24,mock.stream25,mock.stream26,mock.stream27,mock.stream28,mock.stream29,mock.stream30,mock.stream31,mock.stream32,mock.stream33,mock.stream34,mock.stream35,mock.stream36,mock.stream37,mock.stream38,mock.stream39,mock.stream40,mock.stream41,mock.stream42,mock.stream43,mock.stream44,mock.stream45,mock.stream46,mock.stream47,mock.stream48,mock.stream49,mock.stream50,mock.stream51,mock.stream52,mock.stream53,mock.stream54,mock.stream55,mock.stream56,mock.stream57,mock.stream58,mock.stream59,mock.stream60,mock.stream61,mock.stream62,mock.stream63,mock.stream64,mock.stream65,mock.stream66,mock.stream67,mock.stream68,mock.stream69,mock.stream70,m
 ock.stream71,mock.stream72,mock.stream73,mock.stream74,mock.stream75,mock.stream76,mock.stream77,mock.stream78,mock.stream79,mock.stream80,mock.stream81,mock.stream82,mock.stream83,mock.stream84,mock.stream85,mock.stream86,mock.stream87,mock.stream88,mock.stream89,mock.stream90,mock.stream91,mock.stream92,mock.stream93,mock.stream94,mock.stream95,mock.stream96,mock.stream97,mock.stream98,mock.stream99,mock.stream100,mock.stream101,mock.stream102,mock.stream103,mock.stream104,mock.stream105,mock.stream106,mock.stream107,mock.stream108,mock.stream109,mock.stream110,mock.stream111,mock.stream112,mock.stream113,mock.stream114,mock.stream115,mock.stream116,mock.stream117,mock.stream118,mock.stream119,mock.stream120,mock.stream121,mock.stream122,mock.stream123,mock.stream124,mock.stream125,mock.stream126,mock.stream127,mock.stream128,mock.stream129,mock.stream130,mock.stream131,mock.stream132,mock.stream133,mock.stream134,mock.stream135,mock.stream136,mock.stream137,mock.stream138,mock.st
 ream139,mock.stream140,mock.stream141,mock.stream142,mock.stream143,mock.stream144,mock.stream145,mock.stream146,mock.stream147,mock.stream148,mock.stream149,mock.stream150,mock.stream151,mock.stream152,mock.stream153,mock.stream154,mock.stream155,mock.stream156,mock.stream157,mock.stream158,mock.stream159,mock.stream160,mock.stream161,mock.stream162,mock.stream163,mock.stream164,mock.stream165,mock.stream166,mock.stream167,mock.stream168,mock.stream169,mock.stream170,mock.stream171,mock.stream172,mock.stream173,mock.stream174,mock.stream175,mock.stream176,mock.stream177,mock.stream178,mock.stream179,mock.stream180,mock.stream181,mock.stream182,mock.stream183,mock.stream184,mock.stream185,mock.stream186,mock.stream187,mock.stream188,mock.stream189,mock.stream190,mock.stream191,mock.stream192,mock.stream193,mock.stream194,mock.stream195,mock.stream196,mock.stream197,mock.stream198,mock.stream199,mock.stream200,mock.stream201,mock.stream202,mock.stream203,mock.stream204,mock.stream205
 ,mock.stream206,mock.stream207,mock.stream208,mock.stream209,mock.stream210,mock.stream211,mock.stream212,mock.stream213,mock.stream214,mock.stream215,mock.stream216,mock.stream217,mock.stream218,mock.stream219,mock.stream220,mock.stream221,mock.stream222,mock.stream223,mock.stream224,mock.stream225,mock.stream226,mock.stream227,mock.stream228,mock.stream229,mock.stream230,mock.stream231,mock.stream232,mock.stream233,mock.stream234,mock.stream235,mock.stream236,mock.stream237,mock.stream238,mock.stream239,mock.stream240,mock.stream241,mock.stream242,mock.stream243,mock.stream244,mock.stream245,mock.stream246,mock.stream247,mock.stream248,mock.stream249,mock.stream250,mock.stream251,mock.stream252,mock.stream253,mock.stream254,mock.stream255,mock.stream256,mock.stream257,mock.stream258,mock.stream259,mock.stream260,mock.stream261,mock.stream262,mock.stream263,mock.stream264,mock.stream265,mock.stream266,mock.stream267,mock.stream268,mock.stream269,mock.stream270,mock.stream271,mock.s
 tream272,mock.stream273,mock.stream274,mock.stream275,mock.stream276,mock.stream277,mock.stream278,mock.stream279,mock.stream280,mock.stream281,mock.stream282,mock.stream283,mock.stream284,mock.stream285,mock.stream286,mock.stream287,mock.stream288,mock.stream289,mock.stream290,mock.stream291,mock.stream292,mock.stream293,mock.stream294,mock.stream295,mock.stream296,mock.stream297,mock.stream298,mock.stream299,mock.stream300,mock.stream301,mock.stream302,mock.stream303,mock.stream304,mock.stream305,mock.stream306,mock.stream307,mock.stream308,mock.stream309,mock.stream310,mock.stream311,mock.stream312,mock.stream313,mock.stream314,mock.stream315,mock.stream316,mock.stream317,mock.stream318,mock.stream319,mock.stream320,mock.stream321,mock.stream322,mock.stream323,mock.stream324,mock.stream325,mock.stream326,mock.stream327,mock.stream328,mock.stream329,mock.stream330,mock.stream331,mock.stream332,mock.stream333,mock.stream334,mock.stream335,mock.stream336,mock.stream337,mock.stream33
 8,mock.stream339,mock.stream340,mock.stream341,mock.stream342,mock.stream343,mock.stream344,mock.stream345,mock.stream346,mock.stream347,mock.stream348,mock.stream349,mock.stream350,mock.stream351,mock.stream352,mock.stream353,mock.stream354,mock.stream355,mock.stream356,mock.stream357,mock.stream358,mock.stream359,mock.stream360,mock.stream361,mock.stream362,mock.stream363,mock.stream364,mock.stream365,mock.stream366,mock.stream367,mock.stream368,mock.stream369,mock.stream370,mock.stream371,mock.stream372,mock.stream373,mock.stream374,mock.stream375,mock.stream376,mock.stream377,mock.stream378,mock.stream379,mock.stream380,mock.stream381,mock.stream382,mock.stream383,mock.stream384,mock.stream385,mock.stream386,mock.stream387,mock.stream388,mock.stream389,mock.stream390,mock.stream391,mock.stream392,mock.stream393,mock.stream394,mock.stream395,mock.stream396,mock.stream397,mock.stream398,mock.stream399,mock.stream400,mock.stream401,mock.stream402,mock.stream403,mock.stream404,mock.
 stream405,mock.stream406,mock.stream407,mock.stream408,mock.stream409,mock.stream410,mock.stream411,mock.stream412,mock.stream413,mock.stream414,mock.stream415,mock.stream416,mock.stream417,mock.stream418,mock.stream419,mock.stream420,mock.stream421,mock.stream422,mock.stream423,mock.stream424,mock.stream425,mock.stream426,mock.stream427,mock.stream428,mock.stream429,mock.stream430,mock.stream431,mock.stream432,mock.stream433,mock.stream434,mock.stream435,mock.stream436,mock.stream437,mock.stream438,mock.stream439,mock.stream440,mock.stream441,mock.stream442,mock.stream443,mock.stream444,mock.stream445,mock.stream446,mock.stream447,mock.stream448,mock.stream449,mock.stream450,mock.stream451,mock.stream452,mock.stream453,mock.stream454,mock.stream455,mock.stream456,mock.stream457,mock.stream458,mock.stream459,mock.stream460,mock.stream461,mock.stream462,mock.stream463,mock.stream464,mock.stream465,mock.stream466,mock.stream467,mock.stream468,mock.stream469,mock.stream470,mock.stream4
 71,mock.stream472,mock.stream473,mock.stream474,mock.stream475,mock.stream476,mock.stream477,mock.stream478,mock.stream479,mock.stream480,mock.stream481,mock.stream482,mock.stream483,mock.stream484,mock.stream485,mock.stream486,mock.stream487,mock.stream488,mock.stream489,mock.stream490,mock.stream491,mock.stream492,mock.stream493,mock.stream494,mock.stream495,mock.stream496,mock.stream497,mock.stream498,mock.stream499,mock.stream500,mock.stream501,mock.stream502,mock.stream503,mock.stream504,mock.stream505,mock.stream506,mock.stream507,mock.stream508,mock.stream509,mock.stream510,mock.stream511,mock.stream512,mock.stream513,mock.stream514,mock.stream515,mock.stream516,mock.stream517,mock.stream518,mock.stream519,mock.stream520,mock.stream521,mock.stream522,mock.stream523,mock.stream524,mock.stream525,mock.stream526,mock.stream527,mock.stream528,mock.stream529,mock.stream530,mock.stream531,mock.stream532,mock.stream533,mock.stream534,mock.stream535,mock.stream536,mock.stream537,mock
 .stream538,mock.stream539,mock.stream540,mock.stream541,mock.stream542,mock.stream543,mock.stream544,mock.stream545,mock.stream546,mock.stream547,mock.stream548,mock.stream549,mock.stream550,mock.stream551,mock.stream552,mock.stream553,mock.stream554,mock.stream555,mock.stream556,mock.stream557,mock.stream558,mock.stream559,mock.stream560,mock.stream561,mock.stream562,mock.stream563,mock.stream564,mock.stream565,mock.stream566,mock.stream567,mock.stream568,mock.stream569,mock.stream570,mock.stream571,mock.stream572,mock.stream573,mock.stream574,mock.stream575,mock.stream576,mock.stream577,mock.stream578,mock.stream579,mock.stream580,mock.stream581,mock.stream582,mock.stream583,mock.stream584,mock.stream585,mock.stream586,mock.stream587,mock.stream588,mock.stream589,mock.stream590,mock.stream591,mock.stream592,mock.stream593,mock.stream594,mock.stream595,mock.stream596,mock.stream597,mock.stream598,mock.stream599,mock.stream600,mock.stream601,mock.stream602,mock.stream603,mock.stream
 604,mock.stream605,mock.stream606,mock.stream607,mock.stream608,mock.stream609,mock.stream610,mock.stream611,mock.stream612,mock.stream613,mock.stream614,mock.stream615,mock.stream616,mock.stream617,mock.stream618,mock.stream619,mock.stream620,mock.stream621,mock.stream622,mock.stream623,mock.stream624,mock.stream625,mock.stream626,mock.stream627,mock.stream628,mock.stream629,mock.stream630,mock.stream631,mock.stream632,mock.stream633,mock.stream634,mock.stream635,mock.stream636,mock.stream637,mock.stream638,mock.stream639,mock.stream640,mock.stream641,mock.stream642,mock.stream643,mock.stream644,mock.stream645,mock.stream646,mock.stream647,mock.stream648,mock.stream649,mock.stream650,mock.stream651,mock.stream652,mock.stream653,mock.stream654,mock.stream655,mock.stream656,mock.stream657,mock.stream658,mock.stream659,mock.stream660,mock.stream661,mock.stream662,mock.stream663,mock.stream664,mock.stream665,mock.stream666,mock.stream667,mock.stream668,mock.stream669,mock.stream670,moc
 k.stream671,mock.stream672,mock.stream673,mock.stream674,mock.stream675,mock.stream676,mock.stream677,mock.stream678,mock.stream679,mock.stream680,mock.stream681,mock.stream682,mock.stream683,mock.stream684,mock.stream685,mock.stream686,mock.stream687,mock.stream688,mock.stream689,mock.stream690,mock.stream691,mock.stream692,mock.stream693,mock.stream694,mock.stream695,mock.stream696,mock.stream697,mock.stream698,mock.stream699,mock.stream700,mock.stream701,mock.stream702,mock.stream703,mock.stream704,mock.stream705,mock.stream706,mock.stream707,mock.stream708,mock.stream709,mock.stream710,mock.stream711,mock.stream712,mock.stream713,mock.stream714,mock.stream715,mock.stream716,mock.stream717,mock.stream718,mock.stream719,mock.stream720,mock.stream721,mock.stream722,mock.stream723,mock.stream724,mock.stream725,mock.stream726,mock.stream727,mock.stream728,mock.stream729,mock.stream730,mock.stream731,mock.stream732,mock.stream733,mock.stream734,mock.stream735,mock.stream736,mock.strea
 m737,mock.stream738,mock.stream739,mock.stream740,mock.stream741,mock.stream742,mock.stream743,mock.stream744,mock.stream745,mock.stream746,mock.stream747,mock.stream748,mock.stream749,mock.stream750,mock.stream751,mock.stream752,mock.stream753,mock.stream754,mock.stream755,mock.stream756,mock.stream757,mock.stream758,mock.stream759,mock.stream760,mock.stream761,mock.stream762,mock.stream763,mock.stream764,mock.stream765,mock.stream766,mock.stream767,mock.stream768,mock.stream769,mock.stream770,mock.stream771,mock.stream772,mock.stream773,mock.stream774,mock.stream775,mock.stream776,mock.stream777,mock.stream778,mock.stream779,mock.stream780,mock.stream781,mock.stream782,mock.stream783,mock.stream784,mock.stream785,mock.stream786,mock.stream787,mock.stream788,mock.stream789,mock.stream790,mock.stream791,mock.stream792,mock.stream793,mock.stream794,mock.stream795,mock.stream796,mock.stream797,mock.stream798,mock.stream799,mock.stream800,mock.stream801,mock.stream802,mock.stream803,mo
 ck.stream804,mock.stream805,mock.stream806,mock.stream807,mock.stream808,mock.stream809,mock.stream810,mock.stream811,mock.stream812,mock.stream813,mock.stream814,mock.stream815,mock.stream816,mock.stream817,mock.stream818,mock.stream819,mock.stream820,mock.stream821,mock.stream822,mock.stream823,mock.stream824,mock.stream825,mock.stream826,mock.stream827,mock.stream828,mock.stream829,mock.stream830,mock.stream831,mock.stream832,mock.stream833,mock.stream834,mock.stream835,mock.stream836,mock.stream837,mock.stream838,mock.stream839,mock.stream840,mock.stream841,mock.stream842,mock.stream843,mock.stream844,mock.stream845,mock.stream846,mock.stream847,mock.stream848,mock.stream849,mock.stream850,mock.stream851,mock.stream852,mock.stream853,mock.stream854,mock.stream855,mock.stream856,mock.stream857,mock.stream858,mock.stream859,mock.stream860,mock.stream861,mock.stream862,mock.stream863,mock.stream864,mock.stream865,mock.stream866,mock.stream867,mock.stream868,mock.stream869,mock.stre
 am870,mock.stream871,mock.stream872,mock.stream873,mock.stream874,mock.stream875,mock.stream876,mock.stream877,mock.stream878,mock.stream879,mock.stream880,mock.stream881,mock.stream882,mock.stream883,mock.stream884,mock.stream885,mock.stream886,mock.stream887,mock.stream888,mock.stream889,mock.stream890,mock.stream891,mock.stream892,mock.stream893,mock.stream894,mock.stream895,mock.stream896,mock.stream897,mock.stream898,mock.stream899,mock.stream900,mock.stream901,mock.stream902,mock.stream903,mock.stream904,mock.stream905,mock.stream906,mock.stream907,mock.stream908,mock.stream909,mock.stream910,mock.stream911,mock.stream912,mock.stream913,mock.stream914,mock.stream915,mock.stream916,mock.stream917,mock.stream918,mock.stream919,mock.stream920,mock.stream921,mock.stream922,mock.stream923,mock.stream924,mock.stream925,mock.stream926,mock.stream927,mock.stream928,mock.stream929,mock.stream930,mock.stream931,mock.stream932,mock.stream933,mock.stream934,mock.stream935,mock.stream936,m
 ock.stream937,mock.stream938,mock.stream939,mock.stream940,mock.stream941,mock.stream942,mock.stream943,mock.stream944,mock.stream945,mock.stream946,mock.stream947,mock.stream948,mock.stream949,mock.stream950,mock.stream951,mock.stream952,mock.stream953,mock.stream954,mock.stream955,mock.stream956,mock.stream957,mock.stream958,mock.stream959,mock.stream960,mock.stream961,mock.stream962,mock.stream963,mock.stream964,mock.stream965,mock.stream966,mock.stream967,mock.stream968,mock.stream969,mock.stream970,mock.stream971,mock.stream972,mock.stream973,mock.stream974,mock.stream975,mock.stream976,mock.stream977,mock.stream978,mock.stream979,mock.stream980,mock.stream981,mock.stream982,mock.stream983,mock.stream984,mock.stream985,mock.stream986,mock.stream987,mock.stream988,mock.stream989,mock.stream990,mock.stream991,mock.stream992,mock.stream993,mock.stream994,mock.stream995,mock.stream996,mock.stream997,mock.stream998,mock.stream999,
 
-# Kafka System
+# Mock system which produces random messages
 systems.mock.samza.factory=org.apache.samza.system.mock.MockSystemFactory
+
+# Kafka System (only used for coordinator stream in this test)
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+#define coordinator system
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/perf/kafka-read-write-performance.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kafka-read-write-performance.properties b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
index 122b14a..112caba 100644
--- a/samza-test/src/main/config/perf/kafka-read-write-performance.properties
+++ b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
@@ -33,3 +33,6 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.samza.offset.default=oldest
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092
+
+#Coordinator replication factor
+job.coordinator.replication.factor=1

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index c0a20af..8c95211 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -22,6 +22,7 @@ package org.apache.samza.system.mock;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
@@ -54,12 +55,7 @@ public class MockSystemAdmin implements SystemAdmin {
     return metadata;
   }
 
-    @Override
-    public void createChangelogStream(String streamName, int numOfPartitions) {
-        throw new SamzaException("Method not implemented");
-    }
-
-    @Override
+  @Override
   public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
     Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
 
@@ -69,4 +65,14 @@ public class MockSystemAdmin implements SystemAdmin {
 
     return offsetsAfter;
   }
+
+  @Override
+  public void createChangelogStream(String streamName, int numOfPartitions) {
+    throw new UnsupportedOperationException("Method not implemented");
+  }
+
+  @Override
+  public void createCoordinatorStream(String streamName) {
+    throw new UnsupportedOperationException("Method not implemented.");
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/python/samza_failure_testing.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/samza_failure_testing.py b/samza-test/src/main/python/samza_failure_testing.py
index 198db26..28a56b1 100755
--- a/samza-test/src/main/python/samza_failure_testing.py
+++ b/samza-test/src/main/python/samza_failure_testing.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+
 #################################################################################################################################
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 45c76e8..d1f1d84 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -27,10 +27,10 @@ import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskCoordinator.RequestScope
 import org.apache.samza.config.Config
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Util, Logging}
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.samza.util.Util
+
 
 object TestPerformanceTask {
   // No thread safety is needed for these variables because they're mutated in 

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index d66b3bd..8200696 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,6 +22,8 @@ package org.apache.samza.test.integration
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
+import java.util
+
 import kafka.admin.AdminUtils
 import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConfig
@@ -54,14 +56,15 @@ import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskCoordinator.RequestScope
 import org.apache.samza.util.ClientUtilTopicMetadataStore
 import org.apache.samza.util.TopicMetadataStore
+import org.apache.samza.job.JobRunner
 import org.junit.Assert._
 import org.junit.{BeforeClass, AfterClass, Test}
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.SynchronizedMap
+import org.apache.samza.job.JobRunner
 import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
-import java.util
 import org.apache.samza.util.KafkaUtil
 
 object TestStatefulTask {
@@ -193,10 +196,9 @@ object TestStatefulTask {
  */
 class TestStatefulTask {
   import TestStatefulTask._
-  val jobFactory = new ThreadJobFactory
 
   val jobConfig = Map(
-    "job.factory.class" -> jobFactory.getClass.getCanonicalName,
+    "job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName,
     "job.name" -> "hello-stateful-world",
     "task.class" -> "org.apache.samza.test.integration.TestTask",
     "task.inputs" -> "kafka.input",
@@ -206,7 +208,6 @@ class TestStatefulTask {
     "stores.mystore.msg.serde" -> "string",
     "stores.mystore.changelog" -> "kafka.mystoreChangelog",
     "stores.mystore.changelog.replication.factor" -> "1",
-
     "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
     // Always start consuming at offset 0. This avoids a race condition between
     // the producer and the consumer in this test (SAMZA-166, SAMZA-224).
@@ -319,21 +320,15 @@ class TestStatefulTask {
    * time, number of partitions, etc.
    */
   def startJob = {
-    val job = jobFactory.getJob(new MapConfig(jobConfig))
-
     // Start task.
-    job.submit
+    val job = new JobRunner(new MapConfig(jobConfig)).run
     assertEquals(ApplicationStatus.Running, job.waitForStatus(ApplicationStatus.Running, 60000))
     TestTask.awaitTaskRegistered
     val tasks = TestTask.tasks
-
     assertEquals("Should only have a single partition in this task", 1, tasks.size)
-
     val task = tasks.values.toList.head
-
     task.initFinished.await(60, TimeUnit.SECONDS)
     assertEquals(0, task.initFinished.getCount)
-
     (job, task)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 5da1c35..1f51551 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -26,7 +26,6 @@ object YarnConfig {
   val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
   val CONTAINER_RETRY_COUNT = "yarn.container.retry.count"
   val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
-  val CONTAINER_COUNT = "yarn.container.count"
   val AM_JVM_OPTIONS = "yarn.am.opts"
   val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
   val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
@@ -47,8 +46,6 @@ class YarnConfig(config: Config) extends ScalaMapConfig(config) {
 
   def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
 
-  def getContainerCount: Option[Int] = getOption(YarnConfig.CONTAINER_COUNT).map(_.toInt)
-
   def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
 
   def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB).map(_.toInt)

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index a1dbe04..20aa373 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -39,6 +39,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.util.hadoop.HttpFileSystem
 import org.apache.samza.util.Logging
 import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.coordinator.JobCoordinator
 import org.apache.samza.SamzaException
 
 /**
@@ -70,8 +71,12 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
     info("got node manager port: %s" format nodePortString)
     val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
     info("got node manager http port: %s" format nodeHttpPortString)
-    val config = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_CONFIG), classOf[Config]))
-    info("got config: %s" format config)
+    val coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG), classOf[Config]))
+    info("got coordinator system config: %s" format coordinatorSystemConfig)
+    val registry = new MetricsRegistryMap
+    val jobCoordinator = JobCoordinator(coordinatorSystemConfig, registry)
+    val config = jobCoordinator.jobModel.getConfig
+    info("got config: %s" format coordinatorSystemConfig)
     putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name")))
     putMDC("jobId", config.getJobId.getOrElse("1"))
     val hConfig = new YarnConfiguration
@@ -79,14 +84,13 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
     val interval = config.getAMPollIntervalMs.getOrElse(DEFAULT_POLL_INTERVAL_MS)
     val amClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this)
     val clientHelper = new ClientHelper(hConfig)
-    val registry = new MetricsRegistryMap
     val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)
     val containerCpu = config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES)
     val jmxServer = if (new YarnConfig(config).getJmxServerEnabled) Some(new JmxServer()) else None
 
     try {
       // wire up all of the yarn event listeners
-      val state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+      val state = new SamzaAppMasterState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
       val service = new SamzaAppMasterService(config, state, registry, clientHelper)
       val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient)
       val metrics = new SamzaAppMasterMetrics(config, state, registry)

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index b1e5546..1445605 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -33,7 +33,7 @@ import org.apache.samza.coordinator.JobCoordinator
  * responses from YARN's resource manager (via SamzaAppMasterTaskManager). This
  * class holds the current state of the application master.
  */
-class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterState(val jobCoordinator: JobCoordinator, val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
   // controlled by the AM
   var completedContainers = 0
   var neededContainers = 0
@@ -43,7 +43,6 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod
   var runningContainers = Map[Int, YarnContainer]()
   var unclaimedContainers = Set[Int]()
   var finishedContainers = Set[Int]()
-  var jobCoordinator: JobCoordinator = null
   var status = FinalApplicationStatus.UNDEFINED
   var jobHealthy = true
 

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 38e1f5f..1743c86 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -37,13 +37,13 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.util.Records
 import org.apache.samza.config.Config
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.YarnConfig
 import org.apache.samza.config.YarnConfig.Config2Yarn
 import org.apache.samza.job.CommandBuilder
 import org.apache.samza.job.ShellCommandBuilder
-import org.apache.samza.util.Util
 import org.apache.samza.util.Logging
 import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.util.{Util, Logging}
+import org.apache.samza.config.JobConfig.Config2Job
 
 object SamzaAppMasterTaskManager {
   val DEFAULT_CONTAINER_MEM = 1024
@@ -63,13 +63,7 @@ case class ContainerFailure(val count: Int, val lastFailure: Long)
 class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
   import SamzaAppMasterTaskManager._
 
-  state.containerCount = config
-    .getContainerCount
-    .getOrElse({
-      info("No %s specified. Defaulting to one container." format YarnConfig.CONTAINER_COUNT)
-      1
-    })
-  state.jobCoordinator = JobCoordinator(config, state.containerCount)
+  state.containerCount = config.getContainerCount
 
   var containerFailures = Map[Int, ContainerFailure]()
   var tooManyFailedContainers = false

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 24b11da..8dd70c9 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig
 import org.apache.samza.util.Util
 import org.apache.samza.job.ApplicationStatus
 import org.apache.samza.job.ApplicationStatus.Running
@@ -36,6 +37,11 @@ import org.apache.samza.config.YarnConfig
 import org.apache.samza.config.ShellCommandConfig
 import org.apache.samza.SamzaException
 import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.config.JobConfig.Config2Job
+import scala.collection.JavaConversions._
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.SystemConfig
 
 object YarnJob {
   val DEFAULT_AM_CONTAINER_MEM = 1024
@@ -59,8 +65,9 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
         "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
           format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
       Some({
+        val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
         val envMap = Map(
-          ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)),
+          ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)),
           ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))
         val envMapWithJavaHome = config.getAMJavaHome match {
           case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME -> javaHome)

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 765f72f..df5992e 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -34,8 +34,10 @@ import org.junit.Test
 import scala.annotation.elidable
 import scala.annotation.elidable.ASSERTION
 import java.net.URL
+import org.apache.samza.coordinator.JobCoordinator
 
 class TestSamzaAppMasterLifecycle {
+  val coordinator = new JobCoordinator(null, null, null)
   val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
     var host = ""
     var port = 0
@@ -79,7 +81,7 @@ class TestSamzaAppMasterLifecycle {
 
   @Test
   def testLifecycleShouldRegisterOnInit {
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
+    val state = new SamzaAppMasterState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
     state.rpcUrl = new URL("http://localhost:1")
     state.trackingUrl = new URL("http://localhost:2")
     val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient)
@@ -91,7 +93,7 @@ class TestSamzaAppMasterLifecycle {
 
   @Test
   def testLifecycleShouldUnregisterOnShutdown {
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+    val state = new SamzaAppMasterState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.status = FinalApplicationStatus.SUCCEEDED
     new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown
     assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
@@ -111,7 +113,7 @@ class TestSamzaAppMasterLifecycle {
 
   @Test
   def testLifecycleShouldShutdownOnInvalidContainerSettings {
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
+    val state = new SamzaAppMasterState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
     state.rpcUrl = new URL("http://localhost:1")
     state.trackingUrl = new URL("http://localhost:2")
     List(new SamzaAppMasterLifecycle(768, 1, state, amClient),

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index 81dea9d..6f4bfaf 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -24,6 +24,7 @@ import java.net.URL
 import java.io.InputStreamReader
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.junit.Assert._
 import org.junit.Test
 import scala.collection.JavaConversions._
@@ -32,17 +33,16 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 
 class TestSamzaAppMasterService {
   @Test
   def testAppMasterDashboardShouldStart {
     val config = getDummyConfig
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
+    val state = new SamzaAppMasterState(JobCoordinator(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
     val service = new SamzaAppMasterService(config, state, null, null)
     val taskName = new TaskName("test")
 
-    state.jobCoordinator = JobCoordinator(config, 1)
-
     // start the dashboard
     service.onInit
     assertTrue(state.rpcUrl.getPort > 0)
@@ -70,11 +70,9 @@ class TestSamzaAppMasterService {
   def testAppMasterDashboardWebServiceShouldStart {
     // Create some dummy config
     val config = getDummyConfig
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
+    val state = new SamzaAppMasterState(JobCoordinator(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
     val service = new SamzaAppMasterService(config, state, null, null)
 
-    state.jobCoordinator = JobCoordinator(config, 1)
-
     // start the dashboard
     service.onInit
     assertTrue(state.rpcUrl.getPort > 0)
@@ -94,6 +92,7 @@ class TestSamzaAppMasterService {
   }
 
   private def getDummyConfig: Config = new MapConfig(Map[String, String](
+    "job.name" -> "test",
     "yarn.container.count" -> "1",
     "systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory",
     "yarn.container.memory.mb" -> "512",
@@ -102,5 +101,7 @@ class TestSamzaAppMasterService {
     "systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde",
     "systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde",
     "yarn.container.retry.count" -> "1",
-    "yarn.container.retry.window.ms" -> "1999999999"))
+    "yarn.container.retry.window.ms" -> "1999999999",
+    "job.coordinator.system" -> "coordinator",
+    "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index ee2d0ea..1e936b4 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -31,12 +31,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.YarnConfig.Config2Yarn
 import org.apache.samza.config.MapConfig
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.Util
 import org.junit.Test
 import scala.collection.JavaConversions._
 import TestSamzaAppMasterTaskManager._
@@ -44,6 +44,11 @@ import java.net.URL
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.TaskModel
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
@@ -151,16 +156,26 @@ class TestSamzaAppMasterTaskManager {
     "yarn.container.retry.count" -> "1",
     "yarn.container.retry.window.ms" -> "1999999999"))
 
+  def getCoordinator(containerCount: Int = 1) = {
+    val containers = new java.util.HashMap[java.lang.Integer, ContainerModel]()
+    (0 until containerCount).foreach(idx => {
+      val container = new ContainerModel(idx, Map[TaskName, TaskModel]())
+      containers.put(new java.lang.Integer(idx), container)
+    })
+    val jobModel = new JobModel(config, containers)
+    new JobCoordinator(jobModel, null, null)
+  }
+
   @Test
-  def testAppMasterShouldDefaultToOneContainerIfContainerCountIsNotSpecified {
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+  def testAppMasterShouldDefaultToOneContainerIfTaskCountIsNotSpecified {
+    val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
     assertEquals(1, state.containerCount)
   }
 
   @Test
   def testAppMasterShouldStopWhenContainersFinish {
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+    val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
 
     assertFalse(taskManager.shouldShutdown)
@@ -175,7 +190,7 @@ class TestSamzaAppMasterTaskManager {
   @Test
   def testAppMasterShouldRequestANewContainerWhenATaskFails {
     val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+    val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.coordinatorUrl = new URL("http://localhost:1234")
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
       override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
@@ -210,7 +225,7 @@ class TestSamzaAppMasterTaskManager {
   @Test
   def testAppMasterShouldRequestANewContainerWhenATaskIsReleased {
     val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+    val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.coordinatorUrl = new URL("http://localhost:1234")
     state.containerCount = 2
     var containersRequested = 0
@@ -286,7 +301,7 @@ class TestSamzaAppMasterTaskManager {
     map.put("yarn.container.count", "2")
     val newConfig = new MapConfig(map)
     val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+    val state = new SamzaAppMasterState(getCoordinator(2), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.containerCount = 2
     state.coordinatorUrl = new URL("http://localhost:1234")
     var containersStarted = 0
@@ -348,7 +363,7 @@ class TestSamzaAppMasterTaskManager {
   @Test
   def testAppMasterShouldReleaseExtraContainers {
     val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
-    val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+    val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.coordinatorUrl = new URL("http://localhost:1234")
     var containersRequested = 0
     var containersStarted = 0
@@ -403,8 +418,7 @@ class MockSystemFactory extends SystemFactory {
   }
 
   def getAdmin(systemName: String, config: Config) = {
-    val containerCount = config.getContainerCount.getOrElse(1)
-    new MockSystemAdmin(containerCount)
+    new MockSystemAdmin(config.getContainerCount)
   }
 }
 
@@ -422,5 +436,11 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
     }).toMap[String, SystemStreamMetadata]
   }
 
-  override def createChangelogStream(streamName: String, numOfPartitions: Int) = ???
+  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def createCoordinatorStream(streamName: String) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
 }


[2/4] samza git commit: SAMZA-465: Use coordinator stream and eliminate CheckpointManager

Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
deleted file mode 100644
index 10ff1f4..0000000
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.file
-
-import java.io.File
-import scala.collection.JavaConversions._
-import java.util.Random
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import org.apache.samza.SamzaException
-import org.apache.samza.Partition
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.container.TaskName
-import org.junit.rules.TemporaryFolder
-
-class TestFileSystemCheckpointManager  {
-  val checkpointRoot = System.getProperty("java.io.tmpdir") // TODO: Move this out of tmp, into our build dir
-  val taskName = new TaskName("Warwickshire")
-  val baseFileLocation = new File(checkpointRoot)
-
-  val tempFolder = new TemporaryFolder
-
-  @Before
-  def createTempFolder = tempFolder.create()
-
-  @After
-  def deleteTempFolder = tempFolder.delete()
-
-  @Test
-  def testReadForCheckpointFileThatDoesNotExistShouldReturnNull {
-    val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
-    assertNull(cpm.readLastCheckpoint(taskName))
-  }
-
-  @Test
-  def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint {
-    val cp = new Checkpoint(Map(
-          new SystemStreamPartition("a", "b", new Partition(0)) -> "c",
-          new SystemStreamPartition("a", "c", new Partition(1)) -> "d",
-          new SystemStreamPartition("b", "d", new Partition(2)) -> "e"))
-
-    var readCp:Checkpoint = null
-    val cpm =  new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
-
-    cpm.start
-    cpm.writeCheckpoint(taskName, cp)
-    readCp = cpm.readLastCheckpoint(taskName)
-    cpm.stop
-
-    assertNotNull(readCp)
-    cp.equals(readCp)
-    assertEquals(cp.getOffsets.keySet(), readCp.getOffsets.keySet())
-    assertEquals(cp.getOffsets, readCp.getOffsets)
-    assertEquals(cp, readCp)
-  }
-
-  @Test
-  def testMissingRootDirectoryShouldFailOnManagerCreation {
-    val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot + new Random().nextInt))
-    try {
-      cpm.start
-      fail("Expected an exception since root directory for fs checkpoint manager doesn't exist.")
-    } catch {
-      case e: SamzaException => None // this is expected
-    }
-    cpm.stop
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index cab31ca..58d7fe8 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.container
 
+import java.util
 import scala.collection.JavaConversions._
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
@@ -29,7 +30,6 @@ import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
-import org.apache.samza.metrics.JmxServer
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.StreamMetadataCache
@@ -54,21 +54,25 @@ import org.scalatest.junit.AssertionsForJUnit
 import java.lang.Thread.UncaughtExceptionHandler
 import org.apache.samza.serializers._
 import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test
   def testReadJobModel {
     val config = new MapConfig(Map("a" -> "b"))
+    val offsets = new util.HashMap[SystemStreamPartition, String]();
+    offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
     val tasks = Map(
-      new TaskName("t1") -> new TaskModel(new TaskName("t1"), Set[SystemStreamPartition](), new Partition(0)),
-      new TaskName("t2") -> new TaskModel(new TaskName("t2"), Set[SystemStreamPartition](), new Partition(0)))
+      new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)),
+      new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(0)))
     val containers = Map(
       Integer.valueOf(0) -> new ContainerModel(0, tasks),
       Integer.valueOf(1) -> new ContainerModel(1, tasks))
     val jobModel = new JobModel(config, containers)
+    def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
-    val coordinator = new JobCoordinator(jobModel, server)
-    coordinator.server.addServlet("/*", new JobServlet(jobModel))
+    val coordinator = new JobCoordinator(jobModel, server, new MockCheckpointManager)
+    coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator))
     try {
       coordinator.start
       assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -180,7 +184,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
     val config = new MapConfig
     assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", config).isInstanceOf[ByteSerde])
     assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", config).isInstanceOf[IntegerSerde])
-    assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde])
+    assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde[Object]])
     assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", config).isInstanceOf[LongSerde])
     assertTrue(defaultSerdesFromSerdeName("serializable", "testSystemException", config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]])
     assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", config).isInstanceOf[StringSerde])
@@ -196,3 +200,8 @@ class TestSamzaContainer extends AssertionsForJUnit {
     assertTrue(throwSamzaException)
   }
 }
+
+class MockCheckpointManager extends CheckpointManager(null, null) {
+  override def start() = {}
+  override def stop() = {}
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
index 1ee5c06..2c7cb28 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -68,6 +68,6 @@ class TestGroupByContainerCount {
   }
 
   private def getTaskModel(name: String, partitionId: Int) = {
-    new TaskModel(new TaskName(name), Set[SystemStreamPartition](), new Partition(partitionId))
+    new TaskModel(new TaskName(name), Map[SystemStreamPartition, String](), new Partition(partitionId))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index a8e5d36..d9ae187 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,17 +19,19 @@
 
 package org.apache.samza.coordinator
 
-import org.junit.Test
+
+import java.net.SocketTimeoutException
+
+import org.apache.samza.util.Util
+import org.junit.{After, Test}
 import org.junit.Assert._
+import org.junit.rules.ExpectedException
 import scala.collection.JavaConversions._
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.TaskConfig
 import org.apache.samza.config.SystemConfig
-import org.apache.samza.container.TaskName
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.container.{SamzaContainer, TaskName}
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsRegistry}
 import org.apache.samza.config.Config
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemAdmin
@@ -40,67 +42,209 @@ import org.apache.samza.Partition
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.TaskModel
+import org.apache.samza.config.JobConfig
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.coordinator.stream.{MockCoordinatorStreamWrappedConsumer, MockCoordinatorStreamSystemFactory}
 
 class TestJobCoordinator {
   /**
-   * Builds a coordinator from config, and then compares it with what was 
-   * expected. We simulate having a checkpoint manager that has 2 task 
-   * changelog entries, and our model adds a third task. Expectation is that 
-   * the JobCoordinator will assign the new task with a new changelog 
-   * partition.
+   * Builds a coordinator from config, and then compares it with what was
+   * expected. We simulate having a checkpoint manager that has 2 task
+   * changelog entries, and our model adds a third task. Expectation is that
+   * the JobCoordinator will assign the new task with a new changelog
+   * partition
    */
   @Test
   def testJobCoordinator {
-    val containerCount = 2
-    val config = new MapConfig(Map(
-      TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName,
-      TaskConfig.INPUT_STREAMS -> "test.stream1",
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName))
-    val coordinator = JobCoordinator(config, containerCount)
+    val task0Name = new TaskName("Partition 0")
+    val checkpoint0 = Map(new SystemStreamPartition("test", "stream1", new Partition(0)) -> "4")
+    val task1Name = new TaskName("Partition 1")
+    val checkpoint1 = Map(new SystemStreamPartition("test", "stream1", new Partition(1)) ->  "3")
+    val task2Name = new TaskName("Partition 2")
+    val checkpoint2 = Map(new SystemStreamPartition("test", "stream1", new Partition(2)) -> null)
 
-    // Construct the expected JobModel, so we can compare it to 
+    // Construct the expected JobModel, so we can compare it to
     // JobCoordinator's JobModel.
+    val container0Tasks = Map(
+      task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
+      task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
+    val container1Tasks = Map(
+      task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
+    val containers = Map(
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
+      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
+
+
+    // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
+    val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+            task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
+    val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+            task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
+    val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
+    val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
+    val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
+
+    // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
+    // SetCheckpoint and SetChangelog
+    val otherConfigs = Map(
+      checkpointOffset0,
+      checkpointOffset1,
+      changelogInfo0,
+      changelogInfo1,
+      changelogInfo2
+    )
+
+    val config = Map(
+      JobConfig.JOB_NAME -> "test",
+      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+      JobConfig.JOB_CONTAINER_COUNT -> "2",
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+      )
+
+    // We want the mocksystemconsumer to use the same instance across runs
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+    val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+    coordinator.start
+    val jobModel = new JobModel(new MapConfig(config), containers)
+    assertEquals(new MapConfig(config), coordinator.jobModel.getConfig)
+    assertEquals(jobModel, coordinator.jobModel)
+  }
+
+  @Test
+  def testJobCoordinatorCheckpointing = {
+    System.out.println("test  ")
     val task0Name = new TaskName("Partition 0")
+    val checkpoint0 = Map(new SystemStreamPartition("test", "stream1", new Partition(0)) -> "4")
     val task1Name = new TaskName("Partition 1")
+    val checkpoint1 = Map(new SystemStreamPartition("test", "stream1", new Partition(1)) ->"3")
     val task2Name = new TaskName("Partition 2")
+    val checkpoint2 = Map(new SystemStreamPartition("test", "stream1", new Partition(2)) -> "8")
+
+    // Construct the expected JobModel, so we can compare it to
+    // JobCoordinator's JobModel.
     val container0Tasks = Map(
-      task0Name -> new TaskModel(task0Name, Set(new SystemStreamPartition("test", "stream1", new Partition(0))), new Partition(4)),
-      task2Name -> new TaskModel(task2Name, Set(new SystemStreamPartition("test", "stream1", new Partition(2))), new Partition(5)))
+      task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
+      task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
     val container1Tasks = Map(
-      task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(3)))
+      task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
     val containers = Map(
       Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
       Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
-    val jobModel = new JobModel(config, containers)
-    assertEquals(config, coordinator.jobModel.getConfig)
-    assertEquals(jobModel, coordinator.jobModel)
+
+
+    // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
+    val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+            task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
+    val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+            task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
+    val checkpointOffset2 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+            task2Name.getTaskName() -> (Util.sspToString(checkpoint2.keySet.iterator.next()) + ":" + checkpoint2.values.iterator.next())
+    val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
+    val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
+    val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
+
+    // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
+    // SetCheckpoint and SetChangelog
+    // Write a couple of checkpoints that the job coordinator will process
+    val otherConfigs = Map(
+      checkpointOffset0,
+      changelogInfo0
+    )
+
+    val config = Map(
+      JobConfig.JOB_NAME -> "test",
+      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+      JobConfig.JOB_CONTAINER_COUNT -> "2",
+      TaskConfig.INPUT_STREAMS -> "test.stream1",
+      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+      )
+
+    // Enable caching on MockConsumer to add more messages later
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+    // start the job coordinator and verify if it has all the checkpoints through http port
+    val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+    coordinator.start
+    val url = coordinator.server.getUrl.toString
+
+    // Verify if the jobCoordinator has seen the checkpoints
+    var offsets = extractOffsetsFromJobCoordinator(url)
+    assertEquals(1, offsets.size)
+    assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
+
+    // Write more checkpoints
+    val wrappedConsumer = new MockCoordinatorStreamSystemFactory()
+            .getConsumer(null, null, null)
+            .asInstanceOf[MockCoordinatorStreamWrappedConsumer]
+
+    var moreMessageConfigs = Map(
+      checkpointOffset1
+    )
+
+    wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
+
+    // Verify if the coordinator has seen it
+    offsets = extractOffsetsFromJobCoordinator(url)
+    assertEquals(2, offsets.size)
+    assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
+    assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
+
+    // Write more checkpoints but block on read on the mock consumer
+    moreMessageConfigs = Map(
+      checkpointOffset2
+    )
+
+    wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
+
+    // Simulate consumer being blocked (Job coordinator waiting to read new checkpoints from coordinator after container failure)
+    val latch = wrappedConsumer.blockPool();
+
+    // verify if the port times out
+    var seenException = false
+    try {
+      extractOffsetsFromJobCoordinator(url)
+    }
+    catch {
+      case se: SocketTimeoutException => seenException = true
+    }
+    assertTrue(seenException)
+
+    // verify if it has read the new checkpoints after job coordinator has loaded the new checkpoints
+    latch.countDown()
+    offsets = extractOffsetsFromJobCoordinator(url)
+    assertEquals(offsets.size, 3)
+    assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
+    assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
+    assertEquals(checkpoint2.head._2, offsets.getOrElse(checkpoint2.head._1, fail()))
+    coordinator.stop
   }
-}
 
-object MockCheckpointManager {
-  var mapping: java.util.Map[TaskName, java.lang.Integer] = Map[TaskName, java.lang.Integer](
-    new TaskName("Partition 0") -> 4,
-    new TaskName("Partition 1") -> 3)
-}
+  def extractOffsetsFromJobCoordinator(url : String) = {
+    val jobModel = SamzaContainer.readJobModel(url.toString)
+    val taskModels = jobModel.getContainers.values().flatMap(_.getTasks.values())
+    val offsets = taskModels.flatMap(_.getCheckpointedOffsets).toMap
+    offsets.filter(_._2 != null)
+  }
 
-class MockCheckpointManagerFactory extends CheckpointManagerFactory {
-  def getCheckpointManager(config: Config, registry: MetricsRegistry) = new MockCheckpointManager
-}
 
-class MockCheckpointManager extends CheckpointManager {
-  def start() {}
-  def register(taskName: TaskName) {}
-  def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {}
-  def readLastCheckpoint(taskName: TaskName) = null
-  def readChangeLogPartitionMapping = MockCheckpointManager.mapping
-  def writeChangeLogPartitionMapping(mapping: java.util.Map[TaskName, java.lang.Integer]) {
-    MockCheckpointManager.mapping = mapping
+  @After
+  def tearDown() = {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
   }
-  def stop() {}
 }
 
 class MockSystemFactory extends SystemFactory {
-  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = null
+  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = new SystemConsumer {
+    def start() {}
+    def stop() {}
+    def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
+    def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = new java.util.HashMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+  }
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = null
   def getAdmin(systemName: String, config: Config) = new MockSystemAdmin
 }
@@ -117,5 +261,11 @@ class MockSystemAdmin extends SystemAdmin {
     Map(streamNames.toList.head -> new SystemStreamMetadata("foo", partitionMetadata))
   }
 
-  override def createChangelogStream(streamName: String, numOfPartitions: Int) = ???
+  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
+
+  override def createCoordinatorStream(streamName: String) {
+    new UnsupportedOperationException("Method not implemented.")
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
index 9a8e145..6ce8aa9 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
@@ -19,9 +19,9 @@
 
 package org.apache.samza.coordinator.server
 
+import org.apache.samza.util.Util
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.util.Util
 import java.net.URL
 
 class TestHttpServer {

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index 3a710a8..a1efe6f 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -56,7 +56,7 @@ class TestProcessJob {
   }
 }
 
-class MockJobCoordinator extends JobCoordinator(null, null) {
+class MockJobCoordinator extends JobCoordinator(null, null, null) {
   var stopped: Boolean = false
 
   override def start: Unit = { }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
index 6046071..4f1c14c 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
@@ -23,11 +23,13 @@ import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConversions._
+import scala.collection.immutable.HashMap
+
 
 class TestJsonSerde {
   @Test
   def testJsonSerdeShouldWork {
-    val serde = new JsonSerde
+    val serde = new JsonSerde[java.util.HashMap[String, Object]]
     val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2)))
     val bytes = serde.toBytes(obj)
     assertEquals(obj, serde.fromBytes(bytes))

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
deleted file mode 100644
index 5d8ee4f..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.checkpoint.kafka
-
-import java.util
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-import scala.collection.JavaConversions._
-
-/**
- * Kafka Checkpoint Log-specific key used to identify what type of entry is
- * written for any particular log entry.
- *
- * @param map Backing map to hold key values
- */
-class KafkaCheckpointLogKey private (val map: Map[String, String]) {
-  // This might be better as a case class...
-  import KafkaCheckpointLogKey._
-
-  /**
-   * Serialize this key to bytes
-   * @return Key as bytes
-   */
-  def toBytes(): Array[Byte] = {
-    val jMap = new util.HashMap[String, String](map.size)
-    jMap.putAll(map)
-
-    JSON_MAPPER.writeValueAsBytes(jMap)
-  }
-
-  private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY  + " in map for Kafka Checkpoint log key"))
-
-  /**
-   * Is this key for a checkpoint entry?
-   *
-   * @return true iff this key's entry is for a checkpoint
-   */
-  def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
-
-  /**
-   * Is this key for a changelog partition mapping?
-   *
-   * @return true iff this key's entry is for a changelog partition mapping
-   */
-  def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
-
-  /**
-   * If this Key is for a checkpoint entry, return its associated TaskName.
-   *
-   * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
-   */
-  def getCheckpointTaskName = {
-    val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
-    new TaskName(asString)
-  }
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: KafkaCheckpointLogKey =>
-      (that canEqual this) &&
-        map == that.map
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    val state = Seq(map)
-    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
-  }
-}
-
-object KafkaCheckpointLogKey {
-  /**
-   *  Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
-   *  type, either a checkpoint or a changelog-partition-mapping.
-   */
-  val CHECKPOINT_KEY_KEY = "type"
-  val CHECKPOINT_KEY_TYPE = "checkpoint"
-  val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
-  val CHECKPOINT_TASKNAME_KEY = "taskName"
-  val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
-
-  /**
-   * Partition mapping keys have no dynamic values, so we just need one instance.
-   */
-  val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
-
-  private val JSON_MAPPER = new ObjectMapper()
-  val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
-
-  var systemStreamPartitionGrouperFactoryString:Option[String] = None
-
-  /**
-   * Set the name of the factory configured to provide the SystemStreamPartition grouping
-   * so it be included in the key.
-   *
-   * @param str Config value of SystemStreamPartition Grouper Factory
-   */
-  def setSystemStreamPartitionGrouperFactoryString(str:String) = {
-    systemStreamPartitionGrouperFactoryString = Some(str)
-  }
-
-  /**
-   * Get the name of the factory configured to provide the SystemStreamPartition grouping
-   * so it be included in the key
-   */
-  def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
-
-  /**
-   * Build a key for a a checkpoint log entry for a particular TaskName
-   * @param taskName TaskName to build for this checkpoint entry
-   *
-   * @return Key for checkpoint log entry
-   */
-  def getCheckpointKey(taskName:TaskName) = {
-    val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
-      CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
-      SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
-
-    new KafkaCheckpointLogKey(map)
-  }
-
-  /**
-   * Build a key for a changelog partition mapping entry
-   *
-   * @return Key for changelog partition mapping entry
-   */
-  def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
-
-  /**
-   * Deserialize a Kafka checkpoint log key
-   * @param bytes Serialized (via JSON) Kafka checkpoint log key
-   * @return Checkpoint log key
-   */
-  def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
-    try {
-      val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
-
-      if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
-        throw new SamzaException("No type entry in checkpoint key: " + jmap)
-      }
-
-      // Only checkpoint keys have ssp grouper factory keys
-      if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
-        val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
-
-        if (sspGrouperFactory == null) {
-          throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
-        }
-
-        if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
-          throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
-        }
-      }
-
-      new KafkaCheckpointLogKey(jmap.toMap)
-    } catch {
-      case e: Exception =>
-        throw new SamzaException("Exception while deserializing checkpoint key", e)
-    }
-  }
-}
-
-class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
-  override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
-    ") does not match value from current configuration (" + inConfig + ").  " +
-    "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
deleted file mode 100644
index c9504ec..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.kafka
-
-import org.apache.samza.util.Logging
-import java.nio.ByteBuffer
-import java.util
-import kafka.admin.AdminUtils
-import kafka.api._
-import kafka.common.ErrorMapping
-import kafka.common.InvalidMessageSizeException
-import kafka.common.TopicAndPartition
-import kafka.common.TopicExistsException
-import kafka.common.UnknownTopicOrPartitionException
-import kafka.consumer.SimpleConsumer
-import kafka.message.InvalidMessageException
-import kafka.utils.Utils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.container.TaskName
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.TopicMetadataStore
-import scala.collection.mutable
-import java.util.Properties
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
-import org.apache.samza.util.KafkaUtil
-
-/**
- * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
- * To read a checkpoint for a specific taskName, we find the newest message
- * keyed to that taskName. If there is no such message, no checkpoint data
- * exists.  The underlying log has a single partition into which all
- * checkpoints and TaskName to changelog partition mappings are written.
- */
-class KafkaCheckpointManager(
-  clientId: String,
-  checkpointTopic: String,
-  systemName: String,
-  replicationFactor: Int,
-  socketTimeout: Int,
-  bufferSize: Int,
-  fetchSize: Int,
-  metadataStore: TopicMetadataStore,
-  connectProducer: () => Producer[Array[Byte], Array[Byte]],
-  connectZk: () => ZkClient,
-  systemStreamPartitionGrouperFactoryString: String,
-  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-  serde: CheckpointSerde = new CheckpointSerde,
-  checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
-  import KafkaCheckpointManager._
-
-  var taskNames = Set[TaskName]()
-  var producer: Producer[Array[Byte], Array[Byte]] = null
-  var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
-
-  var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
-
-  KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
-
-  info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
-
-  /**
-   * Write Checkpoint for specified taskName to log
-   *
-   * @param taskName Specific Samza taskName of which to write a checkpoint of.
-   * @param checkpoint Reference to a Checkpoint object to store offset data in.
-   **/
-  override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
-    val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
-    val keyBytes = key.toBytes()
-    val msgBytes = serde.toBytes(checkpoint)
-
-    writeLog(CHECKPOINT_LOG4J_ENTRY, keyBytes, msgBytes)
-  }
-
-  /**
-   * Write the taskName to partition mapping that is being maintained by this CheckpointManager
-   *
-   * @param changelogPartitionMapping Each TaskName's partition within the changelog
-   */
-  override def writeChangeLogPartitionMapping(changelogPartitionMapping: util.Map[TaskName, java.lang.Integer]) {
-    val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
-    val keyBytes = key.toBytes()
-    val msgBytes = serde.changelogPartitionMappingToBytes(changelogPartitionMapping)
-
-    writeLog(CHANGELOG_PARTITION_MAPPING_LOG4j, keyBytes, msgBytes)
-  }
-
-  /**
-   * Common code for writing either checkpoints or changelog-partition-mappings to the log
-   *
-   * @param logType Type of entry that is being written, for logging
-   * @param key pre-serialized key for message
-   * @param msg pre-serialized message to write to log
-   */
-  private def writeLog(logType:String, key: Array[Byte], msg: Array[Byte]) {
-    retryBackoff.run(
-      loop => {
-        if (producer == null) {
-          producer = connectProducer()
-        }
-
-        producer.send(new ProducerRecord(checkpointTopic, 0, key, msg)).get()
-        loop.done
-      },
-
-      (exception, loop) => {
-        warn("Failed to write %s partition entry %s: %s. Retrying." format(logType, key, exception))
-        debug("Exception detail:", exception)
-        if (producer != null) {
-          producer.close
-        }
-        producer = null
-      }
-    )
-  }
-
-  private def getConsumer(): SimpleConsumer = {
-    val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-    val metadata = metadataMap(checkpointTopic)
-    val partitionMetadata = metadata.partitionsMetadata
-      .filter(_.partitionId == 0)
-      .headOption
-      .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
-    val leader = partitionMetadata
-      .leader
-      .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
-    info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
-
-    new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
-  }
-
-  private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
-
-  private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
-    val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
-      .partitionErrorAndOffsets
-      .get(topicAndPartition)
-      .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
-    // Fail or retry if there was an an issue with the offset request.
-    KafkaUtil.maybeThrowException(offsetResponse.error)
-
-    val offset: Long = offsetResponse
-      .offsets
-      .headOption
-      .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
-
-    offset
-  }
-
-  /**
-   * Read the last checkpoint for specified TaskName
-   *
-   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
-   **/
-  override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
-    if (!taskNames.contains(taskName)) {
-      throw new SamzaException(taskName + " not registered with this CheckpointManager")
-    }
-
-    info("Reading checkpoint for taskName " + taskName)
-
-    if (taskNamesToOffsets == null) {
-      info("No TaskName to checkpoint mapping provided.  Reading for first time.")
-      taskNamesToOffsets = readCheckpointsFromLog()
-    } else {
-      info("Already existing checkpoint mapping.  Merging new offsets")
-      taskNamesToOffsets ++= readCheckpointsFromLog()
-    }
-
-    val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
-
-    info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
-
-    checkpoint
-  }
-
-  /**
-   * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
-   */
-  private def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
-    val checkpoints = mutable.Map[TaskName, Checkpoint]()
-
-    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
-
-    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
-      val taskName = checkpointKey.getCheckpointTaskName
-
-      if (taskNames.contains(taskName)) {
-        val checkpoint = serde.fromBytes(Utils.readBytes(payload))
-
-        debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
-
-        checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
-      }
-    }
-
-    readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
-
-    checkpoints.toMap /* of the immutable kind */
-  }
-
-  /**
-   * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
-   *
-   * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
-   * checkpoint log reading
-   */
-  override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
-    var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
-
-    def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
-
-    def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
-      changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
-
-      debug("Adding changelog partition mapping" + changelogPartitionMapping)
-    }
-
-    readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
-
-    changelogPartitionMapping
-  }
-
-  /**
-   * Common code for reading both changelog partition mapping and change log
-   *
-   * @param entryType What type of entry to look for within the log key's
-   * @param handleEntry Code to handle an entry in the log once it's found
-   */
-  private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
-                      handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
-    retryBackoff.run[Unit](
-      loop => {
-        val consumer = getConsumer()
-
-        val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
-
-        try {
-          var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
-
-          info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
-
-          val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
-
-          info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
-
-          if (offset < 0) {
-            info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
-            return
-          }
-
-          while (offset < latestOffset) {
-            val request = new FetchRequestBuilder()
-              .addFetch(checkpointTopic, 0, offset, fetchSize)
-              .maxWait(500)
-              .minBytes(1)
-              .clientId(clientId)
-              .build
-
-            val fetchResponse = consumer.fetch(request)
-            if (fetchResponse.hasError) {
-              warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
-              val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
-              if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
-                warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
-                return
-              }
-              KafkaUtil.maybeThrowException(errorCode)
-            }
-
-            for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
-              offset = response.nextOffset
-              startingOffset = Some(offset) // For next time we call
-
-              if (!response.message.hasKey) {
-                throw new KafkaCheckpointException("Encountered message without key.")
-              }
-
-              val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
-
-              if (!shouldHandleEntry(checkpointKey)) {
-                debug("Skipping " + entryType + " entry with key " + checkpointKey)
-              } else {
-                handleEntry(response.message.payload, checkpointKey)
-              }
-            }
-          }
-        } finally {
-          consumer.close()
-        }
-
-        loop.done
-        Unit
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
-          case e: KafkaCheckpointException => throw e
-          case e: Exception =>
-            warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
-            debug("Exception detail:", e)
-        }
-      }
-    ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
-
-  }
-
-  def start {
-    create
-    validateTopic
-  }
-
-  def register(taskName: TaskName) {
-    debug("Adding taskName " + taskName + " to " + this)
-    taskNames += taskName
-  }
-
-  def stop = {
-    if (producer != null) {
-      producer.close
-    }
-  }
-
-  def create {
-    info("Attempting to create checkpoint topic %s." format checkpointTopic)
-    retryBackoff.run(
-      loop => {
-        val zkClient = connectZk()
-        try {
-          AdminUtils.createTopic(
-            zkClient,
-            checkpointTopic,
-            1,
-            replicationFactor,
-            checkpointTopicProperties)
-        } finally {
-          zkClient.close
-        }
-
-        info("Created checkpoint topic %s." format checkpointTopic)
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: TopicExistsException =>
-            info("Checkpoint topic %s already exists." format checkpointTopic)
-            loop.done
-          case e: Exception =>
-            warn("Failed to create topic %s: %s. Retrying." format(checkpointTopic, e))
-            debug("Exception detail:", e)
-        }
-      }
-    )
-  }
-
-  private def validateTopic {
-    info("Validating checkpoint topic %s." format checkpointTopic)
-    retryBackoff.run(
-      loop => {
-        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
-        val topicMetadata = topicMetadataMap(checkpointTopic)
-        KafkaUtil.maybeThrowException(topicMetadata.errorCode)
-
-        val partitionCount = topicMetadata.partitionsMetadata.length
-        if (partitionCount != 1) {
-          throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
-        }
-
-        info("Successfully validated checkpoint topic %s." format checkpointTopic)
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: KafkaCheckpointException => throw e
-          case e: Exception =>
-            warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
-            debug("Exception detail:", e)
-        }
-      }
-    )
-  }
-
-  override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
-}
-
-object KafkaCheckpointManager {
-  val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
-  val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
-}
-
-/**
- * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
- * one to signal a hard failure, and the other to retry. The
- * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
- * CheckpointManager can't recover from.
- */
-class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
-  def this(s: String) = this(s, null)
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
deleted file mode 100644
index 3dfa26a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.kafka
-
-import org.apache.samza.util.Logging
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
-import java.util.Properties
-import scala.collection.JavaConversions._
-import org.apache.kafka.clients.producer.KafkaProducer
-
-object KafkaCheckpointManagerFactory {
-  /**
-   * Version number to track the format of the checkpoint log
-   */
-  val CHECKPOINT_LOG_VERSION_NUMBER = 1
-
-  val INJECTED_PRODUCER_PROPERTIES = Map(
-    "acks" -> "all",
-    // Forcibly disable compression because Kafka doesn't support compression
-    // on log compacted topics. Details in SAMZA-586.
-    "compression.type" -> "none")
-
-  // Set the checkpoint topic configs to have a very small segment size and
-  // enable log compaction. This keeps job startup time small since there 
-  // are fewer useless (overwritten) messages to read from the checkpoint 
-  // topic.
-  def getCheckpointTopicProperties(config: Config) = {
-    val segmentBytes = config
-      .getCheckpointSegmentBytes
-      .getOrElse("26214400")
-
-    (new Properties /: Map(
-      "cleanup.policy" -> "compact",
-      "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
-  }
-}
-
-class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
-  import KafkaCheckpointManagerFactory._
-
-  def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
-    val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
-    val systemName = config
-      .getCheckpointSystem
-      .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
-    val producerConfig = config.getKafkaSystemProducerConfig(
-      systemName,
-      clientId,
-      INJECTED_PRODUCER_PROPERTIES)
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
-    val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
-    val socketTimeout = consumerConfig.socketTimeoutMs
-    val bufferSize = consumerConfig.socketReceiveBufferBytes
-    val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
-
-    val connectProducer = () => {
-      new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
-    }
-    val zkConnect = Option(consumerConfig.zkConnect)
-      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
-    val connectZk = () => {
-      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
-    }
-    val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
-    val jobId = config.getJobId.getOrElse("1")
-    val bootstrapServers = producerConfig.bootsrapServers
-    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
-    val checkpointTopic = getTopic(jobName, jobId)
-
-    // Find out the SSPGrouperFactory class so it can be included/verified in the key
-    val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
-
-    new KafkaCheckpointManager(
-      clientId,
-      checkpointTopic,
-      systemName,
-      replicationFactor,
-      socketTimeout,
-      bufferSize,
-      fetchSize,
-      metadataStore,
-      connectProducer,
-      connectZk,
-      systemStreamPartitionGrouperFactoryString,
-      checkpointTopicProperties = getCheckpointTopicProperties(config))
-  }
-
-  private def getTopic(jobName: String, jobId: String) =
-    "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 02a6ea9..a1de887 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -24,7 +24,6 @@ import java.util.regex.Pattern
 
 import org.apache.samza.util.Util
 import org.apache.samza.util.Logging
-
 import scala.collection.JavaConversions._
 import kafka.consumer.ConsumerConfig
 import java.util.{Properties, UUID}
@@ -41,10 +40,6 @@ object KafkaConfig {
   val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
   val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
 
-  val CHECKPOINT_SYSTEM = "task.checkpoint.system"
-  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
-  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
-
   val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
   val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
   // The default segment size to use for changelog topics
@@ -63,11 +58,6 @@ object KafkaConfig {
 }
 
 class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
-  // checkpoints
-  def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
-  def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
-  def getCheckpointSegmentBytes() = getOption(KafkaConfig.CHECKPOINT_SEGMENT_BYTES)
-
   // custom consumer config
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index a34c3f2..78467bf 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -23,12 +23,12 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ ZkUtils, ZKStringSerializer }
 import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
 import org.apache.samza.SamzaException
+import org.apache.samza.util.Util
 import collection.JavaConversions._
 import org.apache.samza.util.Logging
 import scala.collection._
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.system.SystemStream
-import org.apache.samza.util.Util
 import scala.util.Sorting
 
 /**

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index f783c57..35086f5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -94,6 +94,24 @@ class KafkaSystemAdmin(
   brokerListString: String,
 
   /**
+   * A method that returns a ZkClient for the Kafka system. This is invoked
+   * when the system admin is attempting to create a coordinator stream.
+   */
+  connectZk: () => ZkClient,
+
+  /**
+   * Custom properties to use when the system admin tries to create a new
+   * coordinator stream.
+   */
+  coordinatorStreamProperties: Properties = new Properties,
+
+  /**
+   * The replication factor to use when the system admin creates a new
+   * coordinator stream.
+   */
+  coordinatorStreamReplicationFactor: Int = 1,
+
+  /**
    * The timeout to use for the simple consumer when fetching metadata from
    * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
    */
@@ -113,12 +131,6 @@ class KafkaSystemAdmin(
   clientId: String = UUID.randomUUID.toString,
 
   /**
-   * A function that returns a Zookeeper client to connect to fetch the meta
-   * data information
-   */
-  connectZk: () => ZkClient,
-
-  /**
    * Replication factor for the Changelog topic in kafka
    * Kafka properties to be used during the Changelog topic creation
    */
@@ -201,8 +213,39 @@ class KafkaSystemAdmin(
       (exception, loop) => {
         warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
         debug("Exception detail:", exception)
-      }
-    ).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+      }).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+  }
+
+  def createCoordinatorStream(streamName: String) {
+    info("Attempting to create coordinator stream %s." format streamName)
+    new ExponentialSleepStrategy(initialDelayMs = 500).run(
+      loop => {
+        val zkClient = connectZk()
+        try {
+          AdminUtils.createTopic(
+            zkClient,
+            streamName,
+            1, // Always one partition for coordinator stream.
+            coordinatorStreamReplicationFactor,
+            coordinatorStreamProperties)
+        } finally {
+          zkClient.close
+        }
+
+        info("Created coordinator stream %s." format streamName)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: TopicExistsException =>
+            info("Coordinator stream %s already exists." format streamName)
+            loop.done
+          case e: Exception =>
+            warn("Failed to create topic %s: %s. Retrying." format (streamName, e))
+            debug("Exception detail:", e)
+        }
+      })
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index c84ceb7..1629035 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,10 +19,13 @@
 
 package org.apache.samza.system.kafka
 
+import java.util.Properties
+import org.apache.samza.SamzaException
 import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
 import org.apache.samza.config.Config
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.config.StorageConfig._
@@ -98,8 +101,14 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val zkConnect = Option(consumerConfig.zkConnect)
+      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+    val connectZk = () => {
+      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    }
+    val coordinatorStreamProperties = getCoordinatorTopicProperties(config)
+    val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt
     val storeToChangelog = config.getKafkaChangelogEnabledStores()
-
     // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
     val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) =>
     {
@@ -112,10 +121,20 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     new KafkaSystemAdmin(
       systemName,
       bootstrapServers,
+      connectZk,
+      coordinatorStreamProperties,
+      coordinatorStreamReplicationFactor,
       timeout,
       bufferSize,
       clientId,
-      () => new ZkClient(consumerConfig.zkConnect, 6000, 6000, ZKStringSerializer),
       topicMetaInformation)
   }
+
+  def getCoordinatorTopicProperties(config: Config) = {
+    val segmentBytes = config.getCoordinatorSegmentBytes
+    (new Properties /: Map(
+      "cleanup.policy" -> "compact",
+      "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
deleted file mode 100644
index b76d5ad..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.checkpoint.kafka
-
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.junit.Assert._
-import org.junit.{Before, Test}
-
-class TestKafkaCheckpointLogKey {
-  @Before
-  def setSSPGrouperFactoryString() {
-    KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("hello")
-  }
-
-  @Test
-  def checkpointKeySerializationRoundTrip() {
-    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
-    val asBytes = checkpointKey.toBytes()
-    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
-
-    assertEquals(checkpointKey, backFromBytes)
-    assertNotSame(checkpointKey, backFromBytes)
-  }
-
-  @Test
-  def changelogPartitionMappingKeySerializationRoundTrip() {
-    val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
-    val asBytes = key.toBytes()
-    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
-
-    assertEquals(key, backFromBytes)
-    assertNotSame(key, backFromBytes)
-  }
-
-  @Test
-  def differingSSPGrouperFactoriesCauseException() {
-
-    val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
-
-    val asBytes = checkpointKey.toBytes()
-
-    KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("goodbye")
-
-    var gotException = false
-    try {
-      KafkaCheckpointLogKey.fromBytes(asBytes)
-    } catch {
-      case se:SamzaException => assertEquals(new DifferingSystemStreamPartitionGrouperFactoryValues("hello", "goodbye").getMessage(), se.getCause.getMessage)
-                                gotException = true
-    }
-
-    assertTrue("Should have had an exception since ssp grouper factories didn't match", gotException)
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
deleted file mode 100644
index 7d4bea8..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.kafka
-
-import kafka.admin.AdminUtils
-import kafka.common.InvalidMessageSizeException
-import kafka.common.UnknownTopicOrPartitionException
-import kafka.message.InvalidMessageException
-
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
-import kafka.utils.TestUtils
-import kafka.utils.TestZKUtils
-import kafka.utils.Utils
-import kafka.utils.ZKStringSerializer
-import kafka.zk.EmbeddedZookeeper
-
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.config.{KafkaProducerConfig, MapConfig}
-import org.apache.samza.container.TaskName
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
-import org.apache.samza.{ SamzaException, Partition }
-import org.junit.Assert._
-import org.junit.{ AfterClass, BeforeClass, Test }
-
-import scala.collection._
-import scala.collection.JavaConversions._
-import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer}
-
-object TestKafkaCheckpointManager {
-  val checkpointTopic = "checkpoint-topic"
-  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
-  val zkConnect: String = TestZKUtils.zookeeperConnect
-  var zkClient: ZkClient = null
-  val zkConnectionTimeout = 6000
-  val zkSessionTimeout = 6000
-
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val ports = TestUtils.choosePorts(3)
-  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
-  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  props1.put("controlled.shutdown.enable", "true")
-  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  props1.put("controlled.shutdown.enable", "true")
-  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  props1.put("controlled.shutdown.enable", "true")
-
-  val config = new java.util.HashMap[String, Object]()
-  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-  config.put("acks", "all")
-  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, new Integer(1))
-  config.put(ProducerConfig.RETRIES_CONFIG, new Integer(java.lang.Integer.MAX_VALUE-1))
-  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
-  val partition = new Partition(0)
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
-  var zookeeper: EmbeddedZookeeper = null
-  var server1: KafkaServer = null
-  var server2: KafkaServer = null
-  var server3: KafkaServer = null
-  var metadataStore: TopicMetadataStore = null
-
-  val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
-
-  @BeforeClass
-  def beforeSetupServers {
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    server1 = TestUtils.createServer(new KafkaConfig(props1))
-    server2 = TestUtils.createServer(new KafkaConfig(props2))
-    server3 = TestUtils.createServer(new KafkaConfig(props3))
-    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
-  }
-
-  @AfterClass
-  def afterCleanLogDirs {
-    server1.shutdown
-    server1.awaitShutdown()
-    server2.shutdown
-    server2.awaitShutdown()
-    server3.shutdown
-    server3.awaitShutdown()
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
-    zookeeper.shutdown
-  }
-}
-
-class TestKafkaCheckpointManager {
-  import TestKafkaCheckpointManager._
-
-  @Test
-  def testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
-    val kcm = getKafkaCheckpointManager
-    val taskName = new TaskName(partition.toString)
-    kcm.register(taskName)
-    kcm.start
-    // check that log compaction is enabled.
-    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
-    val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
-    zkClient.close
-    assertEquals("compact", topicConfig.get("cleanup.policy"))
-    assertEquals("26214400", topicConfig.get("segment.bytes"))
-    // read before topic exists should result in a null checkpoint
-    var readCp = kcm.readLastCheckpoint(taskName)
-    assertNull(readCp)
-    // create topic the first time around
-    kcm.writeCheckpoint(taskName, cp1)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp1, readCp)
-    // should get an exception if partition doesn't exist
-    try {
-      readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
-      fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
-    } catch {
-      case e: SamzaException => None // expected
-      case _: Exception => fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
-    }
-    // writing a second message should work, too
-    kcm.writeCheckpoint(taskName, cp2)
-    readCp = kcm.readLastCheckpoint(taskName)
-    assertEquals(cp2, readCp)
-    kcm.stop
-  }
-
-  @Test
-  def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
-    val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
-    exceptions.foreach { exceptionName =>
-      val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
-      val taskName = new TaskName(partition.toString)
-      kcm.register(taskName)
-      kcm.start
-      kcm.writeCheckpoint(taskName, cp1)
-      // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
-      try {
-        kcm.readLastCheckpoint(taskName)
-        fail("Expected a KafkaCheckpointException.")
-      } catch {
-        case e: KafkaCheckpointException => None
-        }
-      kcm.stop
-    }
-  }
-
-  private def getKafkaCheckpointManager = new KafkaCheckpointManager(
-    clientId = "some-client-id",
-    checkpointTopic = checkpointTopic,
-    systemName = "kafka",
-    replicationFactor = 3,
-    socketTimeout = 30000,
-    bufferSize = 64 * 1024,
-    fetchSize = 300 * 1024,
-    metadataStore = metadataStore,
-    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
-    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
-    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
-
-  // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
-  private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
-    clientId = "some-client-id-invalid-serde",
-    checkpointTopic = serdeCheckpointTopic,
-    systemName = "kafka",
-    replicationFactor = 3,
-    socketTimeout = 30000,
-    bufferSize = 64 * 1024,
-    fetchSize = 300 * 1024,
-    metadataStore = metadataStore,
-    connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
-    connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
-    systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
-    serde = new InvalideSerde(exception),
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
-
-  class InvalideSerde(exception: String) extends CheckpointSerde {
-    override def fromBytes(bytes: Array[Byte]): Checkpoint = {
-      exception match {
-        case "InvalidMessageException" => throw new InvalidMessageException
-        case "InvalidMessageSizeException" => throw new InvalidMessageSizeException
-        case "UnknownTopicOrPartitionException" => throw new UnknownTopicOrPartitionException
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 0380d35..d260f2d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -100,15 +100,15 @@ object TestKafkaSystemAdmin {
       REPLICATION_FACTOR)
   }
 
-  def validateTopic(expectedPartitionCount: Int) {
+  def validateTopic(topic: String, expectedPartitionCount: Int) {
     var done = false
     var retries = 0
     val maxRetries = 100
 
     while (!done && retries < maxRetries) {
       try {
-        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(TOPIC), "kafka", metadataStore.getTopicInfo)
-        val topicMetadata = topicMetadataMap(TOPIC)
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(topic)
         val errorCode = topicMetadata.errorCode
 
         KafkaUtil.maybeThrowException(errorCode)
@@ -207,10 +207,9 @@ class TestKafkaSystemAdmin {
 
   @Test
   def testShouldGetOldestNewestAndNextOffsets {
-
     // Create an empty topic with 50 partitions, but with no offsets.
     createTopic
-    validateTopic(50)
+    validateTopic(TOPIC, 50)
 
     // Verify the empty topic behaves as expected.
     var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
@@ -271,7 +270,6 @@ class TestKafkaSystemAdmin {
 
   @Test
   def testNonExistentTopic {
-
     val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
     val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
@@ -289,7 +287,21 @@ class TestKafkaSystemAdmin {
     assertEquals("3", offsetsAfter(ssp2))
   }
 
-  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
+  @Test
+  def testShouldCreateCoordinatorStream {
+    val topic = "test-coordinator-stream"
+    val systemAdmin = new KafkaSystemAdmin("test", brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), coordinatorStreamReplicationFactor = 3)
+    systemAdmin.createCoordinatorStream(topic)
+    validateTopic(topic, 1)
+    val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
+    assertTrue(topicMetadataMap.contains(topic))
+    val topicMetadata = topicMetadataMap(topic)
+    val partitionMetadata = topicMetadata.partitionsMetadata.head
+    assertEquals(0, partitionMetadata.partitionId)
+    assertEquals(3, partitionMetadata.replicas.size)
+  }
+
+  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
     import kafka.api.{ TopicMetadata, TopicMetadataResponse }
 
     // Simulate Kafka telling us that the leader for the topic is not available

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index d3f25c0..d3616fe 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -28,9 +28,13 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -177,7 +181,8 @@ public class StreamAppender extends AppenderSkeleton {
 
     try {
       if (isApplicationMaster) {
-        config = SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_CONFIG()), Config.class);
+        Config coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()), Config.class));
+        config = JobCoordinator.apply(coordinatorSystemConfig).jobModel().getConfig();
       } else {
         String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
         config = SamzaObjectMapper.getObjectMapper().readValue(Util.read(new URL(url), 30000), JobModel.class).getConfig();

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/join/common.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/common.properties b/samza-test/src/main/config/join/common.properties
index ad10aac..ac87e81 100644
--- a/samza-test/src/main/config/join/common.properties
+++ b/samza-test/src/main/config/join/common.properties
@@ -23,7 +23,7 @@ yarn.package.path=<YARN.PACKAGE.PATH>
 
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka-checkpoints
+task.checkpoint.system=kafka
 task.checkpoint.replication.factor=1
 
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
@@ -36,11 +36,5 @@ systems.kafka.producer.bootstrap.servers=localhost:9092
 systems.kafka.samza.key.serde=string
 systems.kafka.samza.msg.serde=string
 
- # Checkpoints System
-systems.kafka-checkpoints.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka-checkpoints.producer.bootstrap.servers=localhost:9092
-systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181
-
 yarn.container.retry.count=-1
 yarn.container.retry.window.ms=60000
-

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties
index b9f898c..26fa1e3 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -38,3 +38,6 @@ systems.kafka.samza.key.serde=string
 systems.kafka.samza.offset.default=oldest
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092
+
+#Coordinator replication factor
+job.coordinator.replication.factor=1


[4/4] samza git commit: SAMZA-465: Use coordinator stream and eliminate CheckpointManager

Posted by ni...@apache.org.
SAMZA-465: Use coordinator stream and eliminate CheckpointManager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/23fb2e1c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/23fb2e1c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/23fb2e1c

Branch: refs/heads/master
Commit: 23fb2e1c097317845439ec0abb938821a6106969
Parents: c37d752
Author: Naveen Somasundaram <na...@gmail.com>
Authored: Tue Apr 28 19:57:06 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-ld1.linkedin.biz>
Committed: Tue Apr 28 19:57:06 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   6 +-
 .../org/apache/samza/checkpoint/Checkpoint.java |  72 ---
 .../samza/checkpoint/CheckpointManager.java     |  70 ---
 .../checkpoint/CheckpointManagerFactory.java    |  30 --
 .../org/apache/samza/container/TaskName.java    |   4 +-
 .../org/apache/samza/system/SystemAdmin.java    |  29 +-
 .../org/apache/samza/task/TaskCoordinator.java  |   2 +-
 ...inglePartitionWithoutOffsetsSystemAdmin.java |   5 +
 .../org/apache/samza/checkpoint/Checkpoint.java |  72 +++
 .../samza/checkpoint/CheckpointManager.java     | 116 +++++
 .../stream/CoordinatorStreamMessage.java        | 476 +++++++++++++++++++
 .../stream/CoordinatorStreamSystemConsumer.java | 197 ++++++++
 .../stream/CoordinatorStreamSystemProducer.java | 134 ++++++
 .../org/apache/samza/job/model/TaskModel.java   |  72 +--
 .../serializers/model/JsonTaskModelMixIn.java   |   8 +-
 .../serializers/model/SamzaObjectMapper.java    |  24 +
 .../storage/ChangelogPartitionManager.java      | 115 +++++
 .../samza/checkpoint/CheckpointTool.scala       |  34 +-
 .../apache/samza/checkpoint/OffsetManager.scala |  58 +--
 .../file/FileSystemCheckpointManager.scala      | 107 -----
 .../org/apache/samza/config/JobConfig.scala     |  63 ++-
 .../samza/config/ShellCommandConfig.scala       |   4 +-
 .../org/apache/samza/config/StreamConfig.scala  |   2 -
 .../org/apache/samza/config/TaskConfig.scala    |   2 +-
 .../apache/samza/container/SamzaContainer.scala |  18 +-
 .../samza/coordinator/JobCoordinator.scala      | 270 +++++++----
 .../samza/coordinator/server/HttpServer.scala   |   3 +-
 .../samza/coordinator/server/JobServlet.scala   |   7 +-
 .../stream/CoordinatorStreamSystemFactory.scala |  50 ++
 .../scala/org/apache/samza/job/JobRunner.scala  |  77 +--
 .../samza/job/local/ProcessJobFactory.scala     |   9 +-
 .../samza/job/local/ThreadJobFactory.scala      |   5 +-
 .../apache/samza/serializers/JsonSerde.scala    |  30 +-
 .../filereader/FileReaderSystemAdmin.scala      |   8 +-
 .../main/scala/org/apache/samza/util/Util.scala | 131 ++++-
 .../MockCoordinatorStreamSystemFactory.java     | 118 +++++
 .../MockCoordinatorStreamWrappedConsumer.java   | 128 +++++
 .../stream/TestCoordinatorStreamMessage.java    |  66 +++
 .../TestCoordinatorStreamSystemConsumer.java    | 133 ++++++
 .../TestCoordinatorStreamSystemProducer.java    | 153 ++++++
 .../model/TestSamzaObjectMapper.java            |   6 +-
 samza-core/src/test/resources/test.properties   |   1 +
 .../samza/checkpoint/TestCheckpointTool.scala   |  20 +-
 .../samza/checkpoint/TestOffsetManager.scala    |  50 +-
 .../file/TestFileSystemCheckpointManager.scala  |  86 ----
 .../samza/container/TestSamzaContainer.scala    |  21 +-
 .../task/TestGroupByContainerCount.scala        |   2 +-
 .../samza/coordinator/TestJobCoordinator.scala  | 238 ++++++++--
 .../coordinator/server/TestHttpServer.scala     |   2 +-
 .../apache/samza/job/local/TestProcessJob.scala |   2 +-
 .../samza/serializers/TestJsonSerde.scala       |   4 +-
 .../kafka/KafkaCheckpointLogKey.scala           | 186 --------
 .../kafka/KafkaCheckpointManager.scala          | 427 -----------------
 .../kafka/KafkaCheckpointManagerFactory.scala   | 116 -----
 .../org/apache/samza/config/KafkaConfig.scala   |  10 -
 .../samza/config/RegExTopicGenerator.scala      |   2 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  59 ++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  23 +-
 .../kafka/TestKafkaCheckpointLogKey.scala       |  71 ---
 .../kafka/TestKafkaCheckpointManager.scala      | 211 --------
 .../system/kafka/TestKafkaSystemAdmin.scala     |  26 +-
 .../samza/logging/log4j/StreamAppender.java     |   7 +-
 .../src/main/config/join/common.properties      |   8 +-
 .../src/main/config/negate-number.properties    |   3 +
 .../perf/container-performance.properties       |  12 +-
 .../kafka-read-write-performance.properties     |   3 +
 .../samza/system/mock/MockSystemAdmin.java      |  18 +-
 .../src/main/python/samza_failure_testing.py    |   2 +
 .../test/performance/TestPerformanceTask.scala  |   4 +-
 .../test/integration/TestStatefulTask.scala     |  17 +-
 .../org/apache/samza/config/YarnConfig.scala    |   3 -
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  12 +-
 .../samza/job/yarn/SamzaAppMasterState.scala    |   3 +-
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  12 +-
 .../org/apache/samza/job/yarn/YarnJob.scala     |   9 +-
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |   8 +-
 .../job/yarn/TestSamzaAppMasterService.scala    |  15 +-
 .../yarn/TestSamzaAppMasterTaskManager.scala    |  42 +-
 78 files changed, 2815 insertions(+), 1834 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 97de3a2..ebad6eb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -123,7 +123,9 @@ project(':samza-api') {
 
 project(":samza-core_$scalaVersion") {
   apply plugin: 'scala'
-
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.main.java.srcDirs = []
   jar {
     manifest {
       attributes("Implementation-Version": "$version")
@@ -239,6 +241,7 @@ project(":samza-yarn_$scalaVersion") {
     compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
   }
 
   repositories {
@@ -384,6 +387,7 @@ project(":samza-test_$scalaVersion") {
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "com.101tec:zkclient:$zkClientVersion"
     testCompile project(":samza-kafka_$scalaVersion")
+    testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
deleted file mode 100644
index 593d118..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
-
-  /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
-   */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
-
-  /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
-   */
-  public Map<SystemStreamPartition, String> getOffsets() {
-    return Collections.unmodifiableMap(offsets);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof Checkpoint)) return false;
-
-    Checkpoint that = (Checkpoint) o;
-
-    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    return offsets != null ? offsets.hashCode() : 0;
-  }
-
-  @Override
-  public String toString() {
-    return "Checkpoint [offsets=" + offsets + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
deleted file mode 100644
index 092cb91..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.container.TaskName;
-
-import java.util.Map;
-
-/**
- * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
- * implementation-specific location.
- */
-public interface CheckpointManager {
-  public void start();
-
-  /**
-   * Registers this manager to write checkpoints of a specific Samza stream partition.
-   * @param taskName Specific Samza taskName of which to write checkpoints for.
-   */
-  public void register(TaskName taskName);
-
-  /**
-   * Writes a checkpoint based on the current state of a Samza stream partition.
-   * @param taskName Specific Samza taskName of which to write a checkpoint of.
-   * @param checkpoint Reference to a Checkpoint object to store offset data in.
-   */
-  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);
-
-  /**
-   * Returns the last recorded checkpoint for a specified taskName.
-   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
-   * @return A Checkpoint object with the recorded offset data of the specified partition.
-   */
-  public Checkpoint readLastCheckpoint(TaskName taskName);
-
-  /**
-   * Read the taskName to partition mapping that is being maintained by this CheckpointManager
-   *
-   * @return TaskName to task log partition mapping, or an empty map if there were no messages.
-   */
-  public Map<TaskName, Integer> readChangeLogPartitionMapping();
-
-  /**
-   * Write the taskName to partition mapping that is being maintained by this CheckpointManager
-   *
-   * @param mapping Each TaskName's partition within the changelog
-   */
-  public void writeChangeLogPartitionMapping(Map<TaskName, Integer> mapping);
-
-  public void stop();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
deleted file mode 100644
index a97ff09..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-
-/**
- * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
- */
-public interface CheckpointManagerFactory {
-  public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/container/TaskName.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
index 0833586..aa19293 100644
--- a/samza-api/src/main/java/org/apache/samza/container/TaskName.java
+++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
@@ -21,7 +21,9 @@ package org.apache.samza.container;
 /**
  * A unique identifier of a set of a SystemStreamPartitions that have been grouped by
  * a {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}.  The
- * SystemStreamPartitionGrouper determines the TaskName for each set it creates.
+ * SystemStreamPartitionGrouper determines the TaskName for each set it creates. The TaskName class
+ * should only contain the taskName. This is necessary because the ChangelogManager assumes that the taskName is
+ * unique enough to identify this class, and uses it to store it in the underlying coordinator stream (as a key).
  */
 public class TaskName implements Comparable<TaskName> {
   private final String taskName;

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 8995ba3..7a588eb 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -23,10 +23,9 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * Helper interface attached to an underlying system to fetch
- * information about streams, partitions, offsets, etc. This interface is useful
- * for providing utility methods that Samza needs in order to interact with a
- * system.
+ * Helper interface attached to an underlying system to fetch information about
+ * streams, partitions, offsets, etc. This interface is useful for providing
+ * utility methods that Samza needs in order to interact with a system.
  */
 public interface SystemAdmin {
 
@@ -51,10 +50,22 @@ public interface SystemAdmin {
    */
   Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
 
-    /**
-     * An API to create a change log stream
-     * @param streamName The name of the stream to be created in the underlying stream
-     * @param numOfPartitions The number of partitions in the changelog stream
-     */
+  /**
+   * An API to create a change log stream
+   * 
+   * @param streamName
+   *          The name of the stream to be created in the underlying stream
+   * @param numOfPartitions
+   *          The number of partitions in the changelog stream
+   */
   void createChangelogStream(String streamName, int numOfPartitions);
+
+  /**
+   * Create a stream for the job coordinator. If the stream already exists, this
+   * call should simply return.
+   * 
+   * @param streamName
+   *          The name of the coordinator stream to create.
+   */
+  void createCoordinatorStream(String streamName);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
index 6ff1a55..e7bf6f1 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
@@ -22,7 +22,7 @@ package org.apache.samza.task;
 /**
  * TaskCoordinators are provided to the process methods of {@link org.apache.samza.task.StreamTask} implementations
  * to allow the user code to request actions from the Samza framework, including committing the current checkpoints
- * to configured {@link org.apache.samza.checkpoint.CheckpointManager}s or shutting down the task or all tasks within
+ * to configured org.apache.samza.checkpoint.CheckpointManager or shutting down the task or all tasks within
  * a container.
  * <p>
  *   This interface may evolve over time.

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 01997ae..a6b14fb 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -69,4 +69,9 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
 
     return offsetsAfter;
   }
+
+  @Override
+  public void createCoordinatorStream(String streamName) {
+    throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
new file mode 100644
index 0000000..593d118
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+public class Checkpoint {
+  private final Map<SystemStreamPartition, String> offsets;
+
+  /**
+   * Constructs a new checkpoint based off a map of Samza stream offsets.
+   * @param offsets Map of Samza streams to their current offset.
+   */
+  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
+    this.offsets = offsets;
+  }
+
+  /**
+   * Gets a unmodifiable view of the current Samza stream offsets.
+   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+   */
+  public Map<SystemStreamPartition, String> getOffsets() {
+    return Collections.unmodifiableMap(offsets);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Checkpoint)) return false;
+
+    Checkpoint that = (Checkpoint) o;
+
+    if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return offsets != null ? offsets.hashCode() : 0;
+  }
+
+  @Override
+  public String toString() {
+    return "Checkpoint [offsets=" + offsets + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
new file mode 100644
index 0000000..3ac63ca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
+ * CoordinatorStream underneath to do this.
+ */
+public class CheckpointManager {
+
+  private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
+  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+  private final Map<TaskName, Checkpoint> taskNamesToOffsets;
+  private final HashSet<TaskName> taskNames;
+  private String source;
+
+  public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
+    taskNames = new HashSet<TaskName>();
+    this.source = "Unknown";
+  }
+
+  public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+      String source) {
+    this(coordinatorStreamProducer, coordinatorStreamConsumer);
+    this.source = source;
+  }
+
+  public void start() {
+    coordinatorStreamProducer.start();
+    coordinatorStreamConsumer.start();
+  }
+
+  /**
+   * Registers this manager to write checkpoints of a specific Samza stream partition.
+   * @param taskName Specific Samza taskName of which to write checkpoints for.
+   */
+  public void register(TaskName taskName) {
+    log.debug("Adding taskName {} to {}", taskName, this);
+    taskNames.add(taskName);
+    coordinatorStreamConsumer.register();
+    coordinatorStreamProducer.register(taskName.getTaskName());
+  }
+
+  /**
+   * Writes a checkpoint based on the current state of a Samza stream partition.
+   * @param taskName Specific Samza taskName of which to write a checkpoint of.
+   * @param checkpoint Reference to a Checkpoint object to store offset data in.
+   */
+  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
+    log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
+    SetCheckpoint checkPointMessage = new SetCheckpoint(source, taskName.getTaskName(), checkpoint);
+    coordinatorStreamProducer.send(checkPointMessage);
+  }
+
+  /**
+   * Returns the last recorded checkpoint for a specified taskName.
+   * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+   * @return A Checkpoint object with the recorded offset data of the specified partition.
+   */
+  public Checkpoint readLastCheckpoint(TaskName taskName) {
+    // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
+    log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
+    Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetCheckpoint.TYPE);
+    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+      SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
+      TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
+      if(taskNames.contains(taskNameInCheckpoint)) {
+        taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint());
+        log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName);
+      }
+    }
+    return taskNamesToOffsets.get(taskName);
+  }
+
+  public void stop() {
+    coordinatorStreamConsumer.stop();
+    coordinatorStreamProducer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
new file mode 100644
index 0000000..f8b705f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Represents a message for the job coordinator. All messages in the coordinator
+ * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
+ * messages are modeled as key/value pairs. The key is a list of well defined
+ * fields: version, type, and key. The value is a map. There are some
+ * pre-defined fields (such as timestamp, host, etc) for the value map, which
+ * are common to all messages.
+ * </p>
+ * 
+ * <p>
+ * The full structure for a CoordinatorStreamMessage is:
+ * </p>
+ * 
+ * <pre>
+ * key =&gt; [1, "set-config", "job.name"] 
+ * 
+ * message =&gt; {
+ *   "host": "192.168.0.1",
+ *   "username": "criccomini",
+ *   "source": "job-runner",
+ *   "timestamp": 123456789,
+ *   "values": {
+ *     "value": "my-job-name"
+ *   } 
+ * }
+ * </pre>
+ * 
+ * Where the key's structure is:
+ * 
+ * <pre>
+ * key =&gt; [&lt;version&gt;, &lt;type&gt;, &lt;key&gt;]
+ * </pre>
+ * 
+ * <p>
+ * Note that the white space in the above JSON blobs are done for legibility.
+ * Over the wire, the JSON should be compact, and no unnecessary white space
+ * should be used. This is extremely important for key serialization, since a
+ * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
+ * be evaluated as two different keys, and Kafka will not log compact them (if
+ * Kafka is used as the underlying system for a coordinator stream).
+ * </p>
+ * 
+ * <p>
+ * The "values" map in the message is defined on a per-message-type basis. For
+ * set-config messages, there is just a single key/value pair, where the "value"
+ * key is defined. For offset messages, there will be multiple key/values pairs
+ * in "values" (one for each SystemStreamPartition/offset pair for a given
+ * TaskName).
+ * </p>
+ * 
+ * <p>
+ * The most important fields are type, key, and values. The type field (defined
+ * as index 1 in the key list) defines the kind of message, the key (defined as
+ * index 2 in the key list) defines a key to associate with the values, and the
+ * values map defines a set of values associated with the type. A concrete
+ * example would be a config message of type "set-config" with key "job.name"
+ * and values {"value": "my-job-name"}.
+ * </p>
+ */
+public class CoordinatorStreamMessage {
+  public static int VERSION_INDEX = 0;
+  public static int TYPE_INDEX = 1;
+  public static int KEY_INDEX = 2;
+
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
+
+  /**
+   * Protocol version for coordinator stream messages. This version number must
+   * be incremented any time new messages are added to the coordinator stream,
+   * or changes are made to the key/message headers.
+   */
+  public static final int VERSION = 1;
+
+  /**
+   * Contains all key fields. Currently, this includes the type of the message,
+   * the key associated with the type (e.g. type: set-config key: job.name), and
+   * the version of the protocol. The indices are defined as the INDEX static
+   * variables above.
+   */
+  private final Object[] keyArray;
+
+  /**
+   * Contains all fields for the message. This includes who sent the message,
+   * the host, etc. It also includes a "values" map, which contains all values
+   * associated with the key of the message. If set-config/job.name were used as
+   * the type/key of the message, then values would contain
+   * {"value":"my-job-name"}.
+   */
+  private final Map<String, Object> messageMap;
+  private boolean isDelete;
+
+  public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
+    this(message.getKeyArray(), message.getMessageMap());
+  }
+
+  public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
+    this.keyArray = keyArray;
+    this.messageMap = messageMap;
+    this.isDelete = messageMap == null;
+  }
+
+  public CoordinatorStreamMessage(String source) {
+    this(source, new Object[] { Integer.valueOf(VERSION), null, null }, new HashMap<String, Object>());
+  }
+
+  public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
+    this(keyArray, messageMap);
+    if (!isDelete) {
+      this.messageMap.put("values", new HashMap<String, String>());
+      setSource(source);
+      setUsername(System.getProperty("user.name"));
+      setTimestamp(System.currentTimeMillis());
+
+      try {
+        setHost(InetAddress.getLocalHost().getHostAddress());
+      } catch (UnknownHostException e) {
+        log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
+        setHost("");
+      }
+    }
+
+    setVersion(VERSION);
+  }
+
+  protected void setIsDelete(boolean isDelete) {
+    this.isDelete = isDelete;
+  }
+
+  protected void setHost(String host) {
+    messageMap.put("host", host);
+  }
+
+  protected void setUsername(String username) {
+    messageMap.put("username", username);
+  }
+
+  protected void setSource(String source) {
+    messageMap.put("source", source);
+  }
+
+  protected void setTimestamp(long timestamp) {
+    messageMap.put("timestamp", Long.valueOf(timestamp));
+  }
+
+  protected void setVersion(int version) {
+    this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
+  }
+
+  protected void setType(String type) {
+    this.keyArray[TYPE_INDEX] = type;
+  }
+
+  protected void setKey(String key) {
+    this.keyArray[KEY_INDEX] = key;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Map<String, String> getMessageValues() {
+    return (Map<String, String>) this.messageMap.get("values");
+  }
+
+  protected String getMessageValue(String key) {
+    return getMessageValues().get(key);
+  }
+
+  /**
+   * @param key
+   *           The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
+   *           easy mutation of the coordinator stream outside of Samza (scripts)
+   * @param value
+   *           The value corresponding to the key, should also be a simple string
+   */
+  protected void putMessageValue(String key, String value) {
+    getMessageValues().put(key, value);
+  }
+
+  /**
+   * The type of the message is used to convert a generic
+   * CoordinatorStreaMessage into a specific message, such as a SetConfig
+   * message.
+   * 
+   * @return The type of the message.
+   */
+  public String getType() {
+    return (String) this.keyArray[TYPE_INDEX];
+  }
+
+  /**
+   * @return The whole key map including both the key and type of the message.
+   */
+  public Object[] getKeyArray() {
+    return this.keyArray;
+  }
+
+  /**
+   * @return Whether the message signifies a delete or not.
+   */
+  public boolean isDelete() {
+    return isDelete;
+  }
+
+  /**
+   * @return Whether the message signifies a delete or not.
+   */
+  public String getUsername() {
+    return (String) this.messageMap.get("username");
+  }
+
+  /**
+   * @return Whether the message signifies a delete or not.
+   */
+  public long getTimestamp() {
+    return (Long) this.messageMap.get("timestamp");
+  }
+
+  /**
+   * @return The whole message map including header information.
+   */
+  public Map<String, Object> getMessageMap() {
+    if (!isDelete) {
+      Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
+      // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
+      immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
+      return Collections.unmodifiableMap(immutableMap);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * @return The source that sent the coordinator message. This is a string
+   *         defined by the sender.
+   */
+  public String getSource() {
+    return (String) this.messageMap.get("source");
+  }
+
+  /**
+   * @return The protocol version that the message conforms to.
+   */
+  public int getVersion() {
+    return (Integer) this.keyArray[VERSION_INDEX];
+  }
+
+  /**
+   * @return The key for a message. The key's meaning is defined by the type of
+   *         the message.
+   */
+  public String getKey() {
+    return (String) this.keyArray[KEY_INDEX];
+  }
+
+  @Override
+  public String toString() {
+    return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (isDelete ? 1231 : 1237);
+    result = prime * result + Arrays.hashCode(keyArray);
+    result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
+    if (isDelete != other.isDelete)
+      return false;
+    if (!Arrays.equals(keyArray, other.keyArray))
+      return false;
+    if (messageMap == null) {
+      if (other.messageMap != null)
+        return false;
+    } else if (!messageMap.equals(other.messageMap))
+      return false;
+    return true;
+  }
+
+  /**
+   * A coordinator stream message that tells the job coordinator to set a
+   * specific configuration.
+   */
+  public static class SetConfig extends CoordinatorStreamMessage {
+    public static final String TYPE = "set-config";
+
+    public SetConfig(CoordinatorStreamMessage message) {
+      super(message.getKeyArray(), message.getMessageMap());
+    }
+
+    public SetConfig(String source, String key, String value) {
+      super(source);
+      setType(TYPE);
+      setKey(key);
+      putMessageValue("value", value);
+    }
+
+    public String getConfigValue() {
+      return (String) getMessageValue("value");
+    }
+  }
+
+  public static class Delete extends CoordinatorStreamMessage {
+    public Delete(String source, String key, String type) {
+      this(source, key, type, VERSION);
+    }
+
+    /**
+     * <p>
+     * Delete messages must take the type of another CoordinatorStreamMessage
+     * (e.g. SetConfig) to define the type of message that's being deleted.
+     * Considering Kafka's log compaction, for example, the keys of a message
+     * and its delete key must match exactly:
+     * </p>
+     * 
+     * <pre>
+     * k=&gt;[1,"job.name","set-config"] .. v=&gt; {..some stuff..}
+     * v=&gt;[1,"job.name","set-config"] .. v=&gt; null
+     * </pre>
+     * 
+     * <p>
+     * Deletes are modeled as a CoordinatorStreamMessage with a null message
+     * map, and a key that's identical to the key map that's to be deleted.
+     * </p>
+     * 
+     * @param source
+     *          The source ID of the sender of the delete message.
+     * @param key
+     *          The key to delete.
+     * @param type
+     *          The type of message to delete. Must correspond to one of hte
+     *          other CoordinatorStreamMessages.
+     * @param version
+     *          The protocol version.
+     */
+    public Delete(String source, String key, String type, int version) {
+      super(source);
+      setType(type);
+      setKey(key);
+      setVersion(version);
+      setIsDelete(true);
+    }
+  }
+
+  /**
+   * The SetCheckpoint is used to store the checkpoint messages for a particular task.
+   * The structure looks like:
+   * {
+   * Key: TaskName
+   * Type: set-checkpoint
+   * Source: ContainerID
+   * MessageMap:
+   *  {
+   *     SSP1 : offset,
+   *     SSP2 : offset
+   *  }
+   * }
+   */
+  public static class SetCheckpoint extends CoordinatorStreamMessage {
+    public static final String TYPE = "set-checkpoint";
+
+    public SetCheckpoint(CoordinatorStreamMessage message) {
+      super(message.getKeyArray(), message.getMessageMap());
+    }
+
+    /**
+     *
+     * @param source The source writing the checkpoint
+     * @param key The key for the checkpoint message (Typically task name)
+     * @param checkpoint Checkpoint message to be written to the stream
+     */
+    public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
+      super(source);
+      setType(TYPE);
+      setKey(key);
+      Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
+      for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
+        putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
+      }
+    }
+
+    public Checkpoint getCheckpoint() {
+      Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
+      for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
+        offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
+      }
+      return new Checkpoint(offsetMap);
+    }
+  }
+
+  /**
+   * The SetChanglog is used to store the changelog parition information for a particular task.
+   * The structure looks like:
+   * {
+   * Key: TaskName
+   * Type: set-changelog
+   * Source: ContainerID
+   * MessageMap:
+   *  {
+   *     "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
+   *  }
+   * }
+   */
+  public static class SetChangelogMapping extends CoordinatorStreamMessage {
+    public static final String TYPE = "set-changelog";
+
+    public SetChangelogMapping(CoordinatorStreamMessage message) {
+      super(message.getKeyArray(), message.getMessageMap());
+    }
+
+    /**
+     *
+     * @param source Source writing the change log mapping
+     * @param taskName The task name to be used in the mapping
+     * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
+     */
+    public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
+      super(source);
+      setType(TYPE);
+      setKey(taskName);
+      putMessageValue("Partition", String.valueOf(changelogPartitionNumber));
+    }
+
+    public String getTaskName() {
+      return getKey();
+    }
+
+    public int getPartition() {
+      return Integer.parseInt(getMessageValue("Partition"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..2134603
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemConsumer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemConsumer {
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemConsumer.class);
+
+  private final Serde<List<?>> keySerde;
+  private final Serde<Map<String, Object>> messageSerde;
+  private final SystemStreamPartition coordinatorSystemStreamPartition;
+  private final SystemConsumer systemConsumer;
+  private final SystemAdmin systemAdmin;
+  private final Map<String, String> configMap;
+  private boolean isBootstrapped;
+  private boolean isStarted;
+  private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new HashSet<CoordinatorStreamMessage>();
+
+  public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+    this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+    this.systemConsumer = systemConsumer;
+    this.systemAdmin = systemAdmin;
+    this.configMap = new HashMap<String, String>();
+    this.isBootstrapped = false;
+    this.keySerde = keySerde;
+    this.messageSerde = messageSerde;
+  }
+
+  public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+    this(coordinatorSystemStream, systemConsumer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+  }
+
+  /**
+   * Retrieves the oldest offset in the coordinator stream, and registers the
+   * coordinator stream with the SystemConsumer using the earliest offset.
+   */
+  public void register() {
+    log.debug("Attempting to register: {}", coordinatorSystemStreamPartition);
+    Set<String> streamNames = new HashSet<String>();
+    String streamName = coordinatorSystemStreamPartition.getStream();
+    streamNames.add(streamName);
+    Map<String, SystemStreamMetadata> systemStreamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+
+    if (systemStreamMetadataMap == null) {
+      throw new SamzaException("Received a null systemStreamMetadataMap from the systemAdmin. This is illegal.");
+    }
+
+    SystemStreamMetadata systemStreamMetadata = systemStreamMetadataMap.get(streamName);
+
+    if (systemStreamMetadata == null) {
+      throw new SamzaException("Expected " + streamName + " to be in system stream metadata.");
+    }
+
+    SystemStreamPartitionMetadata systemStreamPartitionMetadata = systemStreamMetadata.getSystemStreamPartitionMetadata().get(coordinatorSystemStreamPartition.getPartition());
+
+    if (systemStreamPartitionMetadata == null) {
+      throw new SamzaException("Expected metadata for " + coordinatorSystemStreamPartition + " to exist.");
+    }
+
+    String startingOffset = systemStreamPartitionMetadata.getOldestOffset();
+    log.debug("Registering {} with offset {}", coordinatorSystemStreamPartition, startingOffset);
+    systemConsumer.register(coordinatorSystemStreamPartition, startingOffset);
+  }
+
+  /**
+   * Starts the underlying SystemConsumer.
+   */
+  public void start() {
+    if(isStarted)
+    {
+      log.info("Coordinator stream consumer already started");
+      return;
+    }
+    log.info("Starting coordinator stream system consumer.");
+    systemConsumer.start();
+    isStarted = true;
+  }
+
+  /**
+   * Stops the underlying SystemConsumer.
+   */
+  public void stop() {
+    log.info("Stopping coordinator stream system consumer.");
+    systemConsumer.stop();
+  }
+
+  /**
+   * Read all messages from the earliest offset, all the way to the latest.
+   * Currently, this method only pays attention to config messages.
+   */
+  public void bootstrap() {
+    log.info("Bootstrapping configuration from coordinator stream.");
+    SystemStreamPartitionIterator iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+
+    try {
+      while (iterator.hasNext()) {
+        IncomingMessageEnvelope envelope = iterator.next();
+        Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
+        Map<String, Object> valueMap = null;
+        if (envelope.getMessage() != null) {
+          valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
+        }
+        CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
+        log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+        bootstrappedStreamSet.add(coordinatorStreamMessage);
+        if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+          String configKey = coordinatorStreamMessage.getKey();
+          if (coordinatorStreamMessage.isDelete()) {
+            configMap.remove(configKey);
+          } else {
+            String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
+            configMap.put(configKey, configValue);
+          }
+        }
+      }
+      log.debug("Bootstrapped configuration: {}", configMap);
+      isBootstrapped = true;
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  public Set<CoordinatorStreamMessage> getBoostrappedStream() {
+    log.info("Returning the bootstrapped data from the stream");
+    if(!isBootstrapped)
+      bootstrap();
+    return bootstrappedStreamSet;
+  }
+
+  public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
+    log.debug("Bootstrapping coordinator stream for messages of type {}", type);
+    bootstrap();
+    HashSet<CoordinatorStreamMessage> bootstrappedStream = new HashSet<CoordinatorStreamMessage>();
+    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
+      if(type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
+        bootstrappedStream.add(coordinatorStreamMessage);
+      }
+    }
+    return bootstrappedStream;
+  }
+
+  /**
+   * @return The bootstrapped configuration that's been read after bootstrap has
+   *         been invoked.
+   */
+  public Config getConfig() {
+    if (isBootstrapped) {
+      return new MapConfig(configMap);
+    } else {
+      throw new SamzaException("Must call bootstrap before retrieving config.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..0f3e10e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemProducer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemProducer {
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemProducer.class);
+
+  private final Serde<List<?>> keySerde;
+  private final Serde<Map<String, Object>> messageSerde;
+  private final SystemStream systemStream;
+  private final SystemProducer systemProducer;
+  private final SystemAdmin systemAdmin;
+  private boolean isStarted;
+
+  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
+    this(systemStream, systemProducer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+  }
+
+  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+    this.systemStream = systemStream;
+    this.systemProducer = systemProducer;
+    this.systemAdmin = systemAdmin;
+    this.keySerde = keySerde;
+    this.messageSerde = messageSerde;
+  }
+
+  /**
+   * Registers a source with the underlying SystemProducer.
+   * 
+   * @param source
+   *          The source to register.
+   */
+  public void register(String source) {
+    systemProducer.register(source);
+  }
+
+  /**
+   * Creates the coordinator stream, and starts the system producer.
+   */
+  public void start() {
+    if(isStarted)
+    {
+      log.info("Coordinator stream producer already started");
+      return;
+    }
+    log.info("Starting coordinator stream producer.");
+    systemProducer.start();
+    isStarted = true;
+  }
+
+  /**
+   * Stops the underlying SystemProducer.
+   */
+  public void stop() {
+    log.info("Stopping coordinator stream producer.");
+    systemProducer.stop();
+  }
+
+  /**
+   * Serialize and send a coordinator stream message.
+   * 
+   * @param message
+   *          The message to send.
+   */
+  public void send(CoordinatorStreamMessage message) {
+    log.debug("Sending {}", message);
+    try {
+      String source = message.getSource();
+      byte[] key = keySerde.toBytes(Arrays.asList(message.getKeyArray()));
+      byte[] value = null;
+      if (!message.isDelete()) {
+        value = messageSerde.toBytes(message.getMessageMap());
+      }
+      OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(systemStream, Integer.valueOf(0), key, value);
+      systemProducer.send(source, envelope);
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  /**
+   * Helper method that sends a series of SetConfig messages to the coordinator
+   * stream.
+   * 
+   * @param source
+   *          An identifier to denote which source is sending a message. This
+   *          can be any arbitrary string.
+   * @param config
+   *          The config object to store in the coordinator stream.
+   */
+  public void writeConfig(String source, Config config) {
+    log.debug("Writing config: {}", config);
+    for (Map.Entry<String, String> configPair : config.entrySet()) {
+      send(new CoordinatorStreamMessage.SetConfig(source, configPair.getKey(), configPair.getValue()));
+    }
+    systemProducer.flush(source);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index eb22d2e..c26690a 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -20,11 +20,13 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 
+
 /**
  * <p>
  * The data model used to represent a task. The model is used in the job
@@ -39,12 +41,12 @@ import org.apache.samza.system.SystemStreamPartition;
  */
 public class TaskModel implements Comparable<TaskModel> {
   private final TaskName taskName;
-  private final Set<SystemStreamPartition> systemStreamPartitions;
+  private final Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets;
   private final Partition changelogPartition;
 
-  public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+  public TaskModel(TaskName taskName, Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, Partition changelogPartition) {
     this.taskName = taskName;
-    this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
+    this.systemStreamPartitionsToOffsets = Collections.unmodifiableMap(systemStreamPartitionsToOffsets);
     this.changelogPartition = changelogPartition;
   }
 
@@ -53,55 +55,55 @@ public class TaskModel implements Comparable<TaskModel> {
   }
 
   public Set<SystemStreamPartition> getSystemStreamPartitions() {
-    return systemStreamPartitions;
+    return systemStreamPartitionsToOffsets.keySet();
   }
 
   public Partition getChangelogPartition() {
     return changelogPartition;
   }
 
-  @Override
-  public String toString() {
-    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+  public Map<SystemStreamPartition, String> getCheckpointedOffsets() {
+    return systemStreamPartitionsToOffsets;
   }
 
   @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((changelogPartition == null) ? 0 : changelogPartition.hashCode());
-    result = prime * result + ((systemStreamPartitions == null) ? 0 : systemStreamPartitions.hashCode());
-    result = prime * result + ((taskName == null) ? 0 : taskName.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
+  public boolean equals(Object o) {
+    if (this == o) {
       return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
+    }
+    if (o == null || getClass() != o.getClass()) {
       return false;
-    TaskModel other = (TaskModel) obj;
-    if (changelogPartition == null) {
-      if (other.changelogPartition != null)
-        return false;
-    } else if (!changelogPartition.equals(other.changelogPartition))
+    }
+
+    TaskModel taskModel = (TaskModel) o;
+
+    if (!changelogPartition.equals(taskModel.changelogPartition)) {
       return false;
-    if (systemStreamPartitions == null) {
-      if (other.systemStreamPartitions != null)
-        return false;
-    } else if (!systemStreamPartitions.equals(other.systemStreamPartitions))
+    }
+    if (!systemStreamPartitionsToOffsets.equals(taskModel.systemStreamPartitionsToOffsets)) {
       return false;
-    if (taskName == null) {
-      if (other.taskName != null)
-        return false;
-    } else if (!taskName.equals(other.taskName))
+    }
+    if (!taskName.equals(taskModel.taskName)) {
       return false;
+    }
+
     return true;
   }
 
+  @Override
+  public int hashCode() {
+    int result = taskName.hashCode();
+    result = 31 * result + systemStreamPartitionsToOffsets.hashCode();
+    result = 31 * result + changelogPartition.hashCode();
+    return result;
+  }
+
+  @Override
+
+  public String toString() {
+    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitionsToOffsets.keySet() + ", changeLogPartition=" + changelogPartition + "]";
+  }
+
   public int compareTo(TaskModel other) {
     return taskName.compareTo(other.getTaskName());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 7dc431c..172358a 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.serializers.model;
 
-import java.util.Set;
+import java.util.Map;
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
@@ -31,14 +31,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
  */
 public abstract class JsonTaskModelMixIn {
   @JsonCreator
-  public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
+  public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions-offsets") Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, @JsonProperty("changelog-partition") Partition changelogPartition) {
   }
 
   @JsonProperty("task-name")
   abstract TaskName getTaskName();
 
-  @JsonProperty("system-stream-partitions")
-  abstract Set<SystemStreamPartition> getSystemStreamPartitions();
+  @JsonProperty("system-stream-partitions-offsets")
+  abstract Map<SystemStreamPartition, String> getCheckpointedOffsets();
 
   @JsonProperty("changelog-partition")
   abstract Partition getChangelogPartition();

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 3517912..17410c5 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -29,7 +29,9 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
@@ -39,9 +41,11 @@ import org.codehaus.jackson.Version;
 import org.codehaus.jackson.map.DeserializationContext;
 import org.codehaus.jackson.map.JsonDeserializer;
 import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.KeyDeserializer;
 import org.codehaus.jackson.map.MapperConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.codehaus.jackson.map.SerializationConfig;
 import org.codehaus.jackson.map.SerializerProvider;
 import org.codehaus.jackson.map.introspect.AnnotatedField;
 import org.codehaus.jackson.map.introspect.AnnotatedMethod;
@@ -76,9 +80,11 @@ public class SamzaObjectMapper {
     // Setup custom serdes for simple data types.
     module.addSerializer(Partition.class, new PartitionSerializer());
     module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
+    module.addKeySerializer(SystemStreamPartition.class, new SystemStreamPartitionKeySerializer());
     module.addSerializer(TaskName.class, new TaskNameSerializer());
     module.addDeserializer(Partition.class, new PartitionDeserializer());
     module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
+    module.addKeyDeserializer(SystemStreamPartition.class, new SystemStreamPartitionKeyDeserializer());
     module.addDeserializer(Config.class, new ConfigDeserializer());
 
     // Setup mixins for data models.
@@ -88,6 +94,7 @@ public class SamzaObjectMapper {
     mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
     mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+    mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false);
 
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
@@ -138,6 +145,23 @@ public class SamzaObjectMapper {
     }
   }
 
+  public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
+    @Override
+    public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider)
+        throws IOException {
+      String ssp = Util.sspToString(systemStreamPartition);
+      jgen.writeFieldName(ssp);
+    }
+  }
+
+  public static class SystemStreamPartitionKeyDeserializer extends KeyDeserializer {
+    @Override
+    public Object deserializeKey(String sspString, DeserializationContext ctxt)
+        throws IOException {
+      return Util.stringToSsp(sspString);
+    }
+  }
+
   public static class SystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> {
     @Override
     public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
new file mode 100644
index 0000000..fff7634
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The Changelog manager is used to persist and read the changelog information from the coordinator stream.
+ */
+public class ChangelogPartitionManager {
+
+  private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
+  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+  private boolean isCoordinatorConsumerRegistered = false;
+  private String source;
+
+  public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    this.source = "Unknown";
+  }
+
+  public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+      CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+      String source) {
+    this(coordinatorStreamProducer, coordinatorStreamConsumer);
+    this.source = source;
+  }
+
+  public void start() {
+    coordinatorStreamProducer.start();
+    coordinatorStreamConsumer.start();
+  }
+
+  public void stop() {
+    coordinatorStreamConsumer.stop();
+    coordinatorStreamProducer.stop();
+  }
+
+  /**
+   * Registers this manager to write changelog mapping for a particular task.
+   * @param taskName The taskname to be registered for changelog mapping.
+   */
+  public void register(TaskName taskName) {
+    log.debug("Adding taskName {} to {}", taskName, this);
+    if(!isCoordinatorConsumerRegistered) {
+      coordinatorStreamConsumer.register();
+      isCoordinatorConsumerRegistered = true;
+    }
+    coordinatorStreamProducer.register(taskName.getTaskName());
+  }
+
+  /**
+   * Read the taskName to partition mapping that is being maintained by this ChangelogManager
+   * @return TaskName to change log partition mapping, or an empty map if there were no messages.
+   */
+  public Map<TaskName, Integer> readChangeLogPartitionMapping() {
+    log.debug("Reading changelog partition information");
+    Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetChangelogMapping.TYPE);
+    HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
+    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+      SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
+      changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
+      log.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
+    }
+    return changelogMapping;
+  }
+
+  /**
+   * Write the taskName to partition mapping that is being maintained by this ChangelogManager
+   * @param changelogEntries The entries that needs to be written to the coordinator stream, the map takes the taskName
+   *                       and it's corresponding changelog partition.
+   */
+  public void writeChangeLogPartitionMapping(Map<TaskName, Integer> changelogEntries) {
+    log.debug("Updating changelog information with: ");
+    for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
+      log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
+      SetChangelogMapping changelogMapping = new SetChangelogMapping(source,
+          entry.getKey().getTaskName(),
+          entry.getValue());
+      coordinatorStreamProducer.send(changelogMapping);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index ddc30af..2e3aeb8 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -26,14 +26,18 @@ import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.{Config, StreamConfig}
 import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, Util}
+import org.apache.samza.util.CommandLine
 import org.apache.samza.{Partition, SamzaException}
 import scala.collection.JavaConversions._
 import org.apache.samza.util.Logging
 import org.apache.samza.coordinator.JobCoordinator
 
+import scala.collection.immutable.HashMap
+
+
 /**
  * Command-line tool for inspecting and manipulating the checkpoints for a job.
  * This can be used, for example, to force a job to re-process a stream from the
@@ -113,31 +117,30 @@ object CheckpointTool {
     }
   }
 
+  def apply(config: Config, offsets: TaskNameToCheckpointMap) = {
+    val factory = new CoordinatorStreamSystemFactory
+    val coordinatorStreamConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap())
+    val coordinatorStreamProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap())
+    val manager = new CheckpointManager(coordinatorStreamProducer, coordinatorStreamConsumer, "checkpoint-tool")
+    new CheckpointTool(config, offsets, manager)
+  }
+
   def main(args: Array[String]) {
     val cmdline = new CheckpointToolCommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    val tool = new CheckpointTool(config, cmdline.newOffsets)
+    val tool = CheckpointTool(config, cmdline.newOffsets)
     tool.run
   }
 }
 
-class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extends Logging {
-  val manager = config.getCheckpointManagerFactory match {
-    case Some(className) =>
-      Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
-    case _ =>
-      throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
-  }
-
-  // The CheckpointManagerFactory needs to perform this same operation when initializing
-  // the manager. TODO figure out some way of avoiding duplicated work.
+class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manager: CheckpointManager) extends Logging {
 
   def run {
     info("Using %s" format manager)
 
     // Find all the TaskNames that would be generated for this job config
-    val coordinator = JobCoordinator(config, 1)
+    val coordinator = JobCoordinator(config)
     val taskNames = coordinator
       .jobModel
       .getContainers
@@ -165,7 +168,10 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extend
 
   /** Load the most recent checkpoint state for all a specified TaskName. */
   def readLastCheckpoint(taskName:TaskName): Map[SystemStreamPartition, String] = {
-    manager.readLastCheckpoint(taskName).getOffsets.toMap
+    Option(manager.readLastCheckpoint(taskName))
+            .getOrElse(new Checkpoint(new HashMap[SystemStreamPartition, String]()))
+            .getOffsets
+            .toMap
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 85c1749..20e5d26 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,8 @@ object OffsetManager extends Logging {
     config: Config,
     checkpointManager: CheckpointManager = null,
     systemAdmins: Map[String, SystemAdmin] = Map(),
-    offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics) = {
-
+    offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+    latestOffsets: Map[SystemStreamPartition, String] = Map()) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
 
     val offsetSettings = systemStreamMetadata
@@ -99,7 +99,7 @@ object OffsetManager extends Logging {
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
       }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
+    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics, latestOffsets)
   }
 }
 
@@ -142,12 +142,19 @@ class OffsetManager(
   /**
    * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
    */
-  val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics ) extends Logging {
+  val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+
+  /*
+   * The previously read checkpoints restored from the coordinator stream
+   */
+  val previousCheckpointedOffsets: Map[SystemStreamPartition, String] = Map()
+  ) extends Logging {
 
   /**
    * Last offsets processed for each SystemStreamPartition.
    */
-  var lastProcessedOffsets = Map[SystemStreamPartition, String]()
+  // Filter out null offset values, we can't use them, these exist only because of SSP information
+  var lastProcessedOffsets = previousCheckpointedOffsets.filter(_._2 != null)
 
   /**
    * Offsets to start reading from for each SystemStreamPartition. This
@@ -170,7 +177,8 @@ class OffsetManager(
 
   def start {
     registerCheckpointManager
-    loadOffsetsFromCheckpointManager
+    initializeCheckpointManager
+    loadOffsets
     stripResetStreams
     loadStartingOffsets
     loadDefaults
@@ -240,18 +248,21 @@ class OffsetManager(
     }
   }
 
+  private def initializeCheckpointManager {
+    if (checkpointManager != null) {
+      checkpointManager.start
+    } else {
+      debug("Skipping offset load from checkpoint manager because no manager was defined.")
+    }
+  }
+
   /**
    * Loads last processed offsets from checkpoint manager for all registered
    * partitions.
    */
-  private def loadOffsetsFromCheckpointManager {
-    if (checkpointManager != null) {
-      debug("Loading offsets from checkpoint manager.")
-
-      checkpointManager.start
-
-      lastProcessedOffsets ++= systemStreamPartitions.keys
-        .flatMap(restoreOffsetsFromCheckpoint(_)).filter {
+  private def loadOffsets {
+    debug("Loading offsets")
+    lastProcessedOffsets.filter {
           case (systemStreamPartition, offset) =>
             val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
             if (!shouldKeep) {
@@ -260,26 +271,7 @@ class OffsetManager(
             info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
             shouldKeep
         }
-    } else {
-      debug("Skipping offset load from checkpoint manager because no manager was defined.")
-    }
-  }
-
-  /**
-   * Loads last processed offsets for a single taskName.
-   */
-  private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[SystemStreamPartition, String] = {
-    debug("Loading checkpoints for taskName: %s." format taskName)
-
-    val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    if (checkpoint != null) {
-      checkpoint.getOffsets.toMap
-    } else {
-      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-
-      Map()
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
deleted file mode 100644
index 2a87a6e..0000000
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.file
-
-import java.io.File
-import java.io.FileNotFoundException
-import java.io.FileOutputStream
-import java.util
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.serializers.CheckpointSerde
-import scala.io.Source
-
-class FileSystemCheckpointManager(
-  jobName: String,
-  root: File,
-  serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager {
-
-  override def register(taskName: TaskName):Unit = Unit
-
-  def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
-
-  def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
-    val bytes = serde.toBytes(checkpoint)
-    val fos = new FileOutputStream(getCheckpointFile(taskName))
-
-    fos.write(bytes)
-    fos.close
-  }
-
-  def readLastCheckpoint(taskName: TaskName): Checkpoint = {
-    try {
-      val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray
-
-      serde.fromBytes(bytes)
-    } catch {
-      case e: FileNotFoundException => null
-    }
-  }
-
-  def start {
-    if (!root.exists) {
-      throw new SamzaException("Root directory for file system checkpoint manager does not exist: %s" format root)
-    }
-  }
-
-  def stop {}
-
-  private def getFile(jobName: String, taskName: TaskName, fileType:String) =
-    new File(root, "%s-%s-%s" format (jobName, taskName, fileType))
-
-  private def getChangeLogPartitionMappingFile() = getFile(jobName, new TaskName("partition-mapping"), "changelog-partition-mapping")
-
-  override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
-    try {
-      val bytes = Source.fromFile(getChangeLogPartitionMappingFile()).map(_.toByte).toArray
-      serde.changelogPartitionMappingFromBytes(bytes)
-    } catch {
-      case e: FileNotFoundException => new util.HashMap[TaskName, java.lang.Integer]()
-    }
-  }
-
-  def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = {
-    val hashmap = new util.HashMap[TaskName, java.lang.Integer](mapping)
-    val bytes = serde.changelogPartitionMappingToBytes(hashmap)
-    val fos = new FileOutputStream(getChangeLogPartitionMappingFile())
-
-    fos.write(bytes)
-    fos.close
-  }
-}
-
-class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
-  def getCheckpointManager(config: Config, registry: MetricsRegistry) = {
-    val name = config
-      .getName
-      .getOrElse(throw new SamzaException("Missing job name in configs"))
-    val root = config
-      .getFileSystemCheckpointRoot
-      .getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
-    new FileSystemCheckpointManager(name, new File(root))
-  }
-}


[3/4] samza git commit: SAMZA-465: Use coordinator stream and eliminate CheckpointManager

Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 3b6685e..e4b14f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -20,6 +20,9 @@
 package org.apache.samza.config
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.Logging
 
 object JobConfig {
   // job config constants
@@ -35,15 +38,45 @@ object JobConfig {
   val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
   val JOB_NAME = "job.name" // streaming.job_name
   val JOB_ID = "job.id" // streaming.job_id
-
+  val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
+  val JOB_CONTAINER_COUNT = "job.container.count"
+  val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
+  val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
 
   implicit def Config2Job(config: Config) = new JobConfig(config)
 }
 
-class JobConfig(config: Config) extends ScalaMapConfig(config) {
+class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getName = getOption(JobConfig.JOB_NAME)
 
+  def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({
+    // If no coordinator system is configured, try and guess it if there's just one system configured.
+    val systemNames = config.getSystemNames.toSet
+    if (systemNames.size == 1) {
+      val systemName = systemNames.iterator.next
+      info("No coordinator system defined, so defaulting to %s" format systemName)
+      systemName
+    } else {
+      throw new ConfigException("Missing job.coordinator.system configuration.")
+    }
+  })
+
+  def getContainerCount = {
+    getOption(JobConfig.JOB_CONTAINER_COUNT) match {
+      case Some(count) => count.toInt
+      case _ =>
+        // To maintain backwards compatibility, honor yarn.container.count for now.
+        // TODO get rid of this in a future release.
+        getOption("yarn.container.count") match {
+          case Some(count) =>
+            warn("Configuration 'yarn.container.count' is deprecated. Please use %s." format JobConfig.JOB_CONTAINER_COUNT)
+            count.toInt
+          case _ => 1
+        }
+    }
+  }
+
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
   def getJobId = getOption(JobConfig.JOB_ID)
@@ -53,4 +86,30 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) {
   def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name)
 
   def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
+
+  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+
+  def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match {
+    case Some(rplFactor) => rplFactor
+    case _ =>
+      // TODO get rid of checkpoint configs in a future release
+      getOption("task.checkpoint.replication.factor") match {
+        case Some(rplFactor) =>
+          warn("Configuration 'task.checkpoint.replication.factor' is deprecated. Please use %s." format JobConfig.JOB_REPLICATION_FACTOR)
+          rplFactor
+        case _ => "3"
+      }
+  }
+
+  def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match {
+    case Some(segBytes) => segBytes
+    case _ =>
+      // TODO get rid of checkpoint configs in a future release
+      getOption("task.checkpoint.segment.bytes") match {
+        case Some(segBytes) =>
+          warn("Configuration 'task.checkpoint.segment.bytes' is deprecated. Please use %s." format JobConfig.JOB_SEGMENT_BYTES)
+          segBytes
+        case _ => "26214400"
+      }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 1a2dd44..e94a473 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -21,9 +21,9 @@ package org.apache.samza.config
 
 object ShellCommandConfig {
   /**
-   * This environment variable is used to store a JSON serialized map of all configuration.
+   * This environment variable is used to store a JSON serialized map of all coordinator system configs.
    */
-  val ENV_CONFIG = "SAMZA_CONFIG"
+  val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
 
   /**
    * The ID for a container. This is an integer number between 0 and

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index adef09e..e172589 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -21,8 +21,6 @@ package org.apache.samza.config
 
 import org.apache.samza.util.Logging
 import scala.collection.JavaConversions._
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Util
 import org.apache.samza.system.SystemStream
 
 object StreamConfig {

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index cd06c06..0b3a235 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -19,8 +19,8 @@
 
 package org.apache.samza.config
 
-import org.apache.samza.util.Util
 import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Util
 
 object TaskConfig {
   // task config constants

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 720fbdc..56819e0 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,7 +21,7 @@ package org.apache.samza.container
 
 import java.io.File
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager }
+import org.apache.samza.checkpoint.{ CheckpointManager, OffsetManager }
 import org.apache.samza.config.Config
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
@@ -30,6 +30,7 @@ import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -328,17 +329,16 @@ object SamzaContainer extends Logging {
 
     info("Got metrics reporters: %s" format reporters.keys)
 
-    val checkpointManager = config.getCheckpointManagerFactory match {
-      case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
-        Util
-          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
-          .getCheckpointManager(config, samzaContainerMetrics.registry)
-      case _ => null
-    }
+    val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
+    val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
+    val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
 
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
+    val combinedOffsets: Map[SystemStreamPartition, String] =
+      containerModel.getTasks.values().flatMap(_.getCheckpointedOffsets).toMap
+
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
 
     info("Got offset manager: %s" format offsetManager)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index c14f2f6..5b43b58 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -19,61 +19,74 @@
 
 package org.apache.samza.coordinator
 
+
 import org.apache.samza.config.Config
-import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
 import org.apache.samza.SamzaException
 import org.apache.samza.container.grouper.task.GroupByContainerCount
-import org.apache.samza.util.Util
-import org.apache.samza.checkpoint.CheckpointManagerFactory
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import java.util
 import org.apache.samza.container.TaskName
+import org.apache.samza.storage.ChangelogPartitionManager
 import org.apache.samza.util.Logging
-import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.config.StorageConfig.Config2Storage
+import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.Partition
-import org.apache.samza.job.model.TaskModel
 import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import java.net.URL
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamMessage, CoordinatorStreamSystemFactory}
+import org.apache.samza.config.ConfigRewriter
 
+/**
+ * Helper companion object that is responsible for wiring up a JobCoordinator
+ * given a Config object.
+ */
 object JobCoordinator extends Logging {
+
   /**
-   * Build a JobCoordinator using a Samza job's configuration.
+   * @param coordinatorSystemConfig A config object that contains job.name,
+   * job.id, and all system.&lt;job-coordinator-system-name&gt;.*
+   * configuration. The method will use this config to read all configuration
+   * from the coordinator stream, and instantiate a JobCoordinator.
    */
-  def apply(config: Config, containerCount: Int) = {
-    val jobModel = buildJobModel(config, containerCount)
-    val server = new HttpServer
-    server.addServlet("/*", new JobServlet(jobModel))
-    new JobCoordinator(jobModel, server)
+  def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobCoordinator = {
+    val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
+    val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+    val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+    info("Registering coordinator system stream.")
+    coordinatorSystemConsumer.register
+    debug("Starting coordinator system stream.")
+    coordinatorSystemConsumer.start
+    debug("Bootstrapping coordinator system stream.")
+    coordinatorSystemConsumer.bootstrap
+    debug("Stopping coordinator system stream.")
+    coordinatorSystemConsumer.stop
+    val config = coordinatorSystemConsumer.getConfig
+    info("Got config: %s" format config)
+    val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+    val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+    getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager)
   }
 
+  def apply(coordinatorSystemConfig: Config): JobCoordinator = apply(coordinatorSystemConfig, new MetricsRegistryMap())
+
   /**
-   * Gets a CheckpointManager from the configuration.
+   * Build a JobCoordinator using a Samza job's configuration.
    */
-  def getCheckpointManager(config: Config) = {
-    config.getCheckpointManagerFactory match {
-      case Some(checkpointFactoryClassName) =>
-        Util
-          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
-          .getCheckpointManager(config, new MetricsRegistryMap)
-      case _ =>
-        if (!config.getStoreNames.isEmpty) {
-          throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified.  " +
-            "Unable to start job as there would be no place to store changelog partition mapping.")
-        }
-        null
-    }
+  def getJobCoordinator(config: Config, checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager) = {
+    val containerCount = config.getContainerCount
+    val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager)
+    val server = new HttpServer
+    server.addServlet("/*", new JobServlet(jobModelGenerator))
+    new JobCoordinator(jobModelGenerator(), server, checkpointManager)
   }
 
   /**
@@ -115,74 +128,135 @@ object JobCoordinator extends Logging {
   }
 
   /**
-   * Build a full Samza job model using the job configuration.
+   * Re-writes configuration using a ConfigRewriter, if one is defined. If
+   * there is no ConfigRewriter defined for the job, then this method is a
+   * no-op.
+   *
+   * @param config The config to re-write.
    */
-  def buildJobModel(config: Config, containerCount: Int) = {
-    // TODO containerCount should go away when we generalize the job coordinator, 
+  def rewriteConfig(config: Config): Config = {
+    def rewrite(c: Config, rewriterName: String): Config = {
+      val klass = config
+        .getConfigRewriterClass(rewriterName)
+        .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+      val rewriter = Util.getObj[ConfigRewriter](klass)
+      info("Re-writing config with " + rewriter)
+      rewriter.rewrite(rewriterName, c)
+    }
+
+    config.getConfigRewriters match {
+      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+      case _ => config
+    }
+  }
+
+  /**
+   * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
+   * which catchup with the latest content from the coordinator stream.
+   */
+  private def initializeJobModel(config: Config,
+                                 containerCount: Int,
+                                 checkpointManager: CheckpointManager,
+                                 changelogManager: ChangelogPartitionManager): () => JobModel = {
+    // TODO containerCount should go away when we generalize the job coordinator,
     // and have a non-yarn-specific way of specifying container count.
-    val checkpointManager = getCheckpointManager(config)
+
+    // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getInputStreamPartitions(config)
     val grouper = getSystemStreamPartitionGrouper(config)
-    val previousChangelogeMapping = if (checkpointManager != null) {
-      checkpointManager.start
-      checkpointManager.readChangeLogPartitionMapping
-    } else {
-      new util.HashMap[TaskName, java.lang.Integer]()
+    info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
+    val groups = grouper.group(allSystemStreamPartitions)
+
+    // Initialize the ChangelogPartitionManager and the CheckpointManager
+    val previousChangelogeMapping = if (changelogManager != null)
+    {
+      changelogManager.start
+      changelogManager.readChangeLogPartitionMapping
     }
-    var maxChangelogPartitionId = previousChangelogeMapping
-      .values
-      .map(_.toInt)
-      .toList
-      .sorted
-      .lastOption
-      .getOrElse(-1)
-
-    // Assign all SystemStreamPartitions to TaskNames.
-    val taskModels = {
-      val groups = grouper.group(allSystemStreamPartitions)
-      info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
-      groups
-        .map {
-          case (taskName, systemStreamPartitions) =>
-            val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match {
-              case Some(changelogPartitionId) => new Partition(changelogPartitionId)
-              case _ =>
-                // If we've never seen this TaskName before, then assign it a 
-                // new changelog.
-                maxChangelogPartitionId += 1
-                info("New task %s is being assigned changelog partition %s." format (taskName, maxChangelogPartitionId))
-                new Partition(maxChangelogPartitionId)
-            }
-            new TaskModel(taskName, systemStreamPartitions, changelogPartition)
-        }
-        .toSet
+    else
+    {
+      new util.HashMap[TaskName, java.lang.Integer]()
     }
+    checkpointManager.start
+    groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
+
+    // Generate the jobModel
+    def jobModelGenerator(): JobModel = refreshJobModel(config,
+                                                        allSystemStreamPartitions,
+                                                        checkpointManager,
+                                                        groups,
+                                                        previousChangelogeMapping,
+                                                        containerCount)
+
+    val jobModel = jobModelGenerator()
 
-    // Save the changelog mapping back to the checkpoint manager.
-    if (checkpointManager != null) {
-      // newChangelogMapping is the merging of all current task:changelog 
+    // Save the changelog mapping back to the ChangelogPartitionmanager
+    if (changelogManager != null)
+    {
+      // newChangelogMapping is the merging of all current task:changelog
       // assignments with whatever we had before (previousChangelogeMapping).
-      // We must persist legacy changelog assignments so that 
-      // maxChangelogPartitionId always has the absolute max, not the current 
-      // max (in case the task with the highest changelog partition mapping 
+      // We must persist legacy changelog assignments so that
+      // maxChangelogPartitionId always has the absolute max, not the current
+      // max (in case the task with the highest changelog partition mapping
       // disappears.
-      val newChangelogMapping = taskModels.map(taskModel => {
-        taskModel.getTaskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
-      }).toMap ++ previousChangelogeMapping
+      val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
+                                                 taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+                                               }}.toMap ++ previousChangelogeMapping
       info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
-      checkpointManager.writeChangeLogPartitionMapping(newChangelogMapping)
-      checkpointManager.stop
+      changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
     }
+    // Return a jobModelGenerator lambda that can be used to refresh the job model
+    jobModelGenerator
+  }
 
-    // Here is where we should put in a pluggable option for the 
-    // SSPTaskNameGrouper for locality, load-balancing, etc.
-    val containerGrouper = new GroupByContainerCount(containerCount)
-    val containerModels = containerGrouper
-      .group(taskModels)
-      .map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }
-      .toMap
+  /**
+   * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
+   * builds a new JobModel.
+   * This method needs to be thread safe, the reason being, for every HTTP request from a container, this method is called
+   * and underlying it uses the same instance of coordinator stream producer and coordinator stream consumer.
+   */
+  private def refreshJobModel(config: Config,
+                              allSystemStreamPartitions: util.Set[SystemStreamPartition],
+                              checkpointManager: CheckpointManager,
+                              groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
+                              previousChangelogeMapping: util.Map[TaskName, Integer],
+                              containerCount: Int): JobModel = {
+    this.synchronized
+    {
+      // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
+      // mapping.
+      var maxChangelogPartitionId = previousChangelogeMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+
+      // Assign all SystemStreamPartitions to TaskNames.
+      val taskModels =
+      {
+        groups.map
+                { case (taskName, systemStreamPartitions) =>
+                  val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
+                  // Find the system partitions which don't have a checkpoint and set null for the values for offsets
+                  val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
+                  val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match
+                  {
+                    case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+                    case _ =>
+                      // If we've never seen this TaskName before, then assign it a
+                      // new changelog.
+                      maxChangelogPartitionId += 1
+                      info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
+                      new Partition(maxChangelogPartitionId)
+                  }
+                  new TaskModel(taskName, offsetMap, changelogPartition)
+                }.toSet
+      }
+
+      // Here is where we should put in a pluggable option for the
+      // SSPTaskNameGrouper for locality, load-balancing, etc.
+      val containerGrouper = new GroupByContainerCount(containerCount)
+      val containerModels = containerGrouper.group(taskModels).map
+              { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
 
-    new JobModel(config, containerModels)
+      new JobModel(config, containerModels)
+    }
   }
 }
 
@@ -207,19 +281,31 @@ class JobCoordinator(
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer) extends Logging {
+  val server: HttpServer = null,
+
+  /**
+   * Handle to checkpoint manager that's used to refresh the JobModel
+   */
+  val checkpointManager: CheckpointManager) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
   def start {
-    debug("Starting HTTP server.")
-    server.start
-    info("Startd HTTP server: %s" format server.getUrl)
+    if (server != null) {
+      debug("Starting HTTP server.")
+      server.start
+      info("Startd HTTP server: %s" format server.getUrl)
+    }
   }
 
   def stop {
-    debug("Stopping HTTP server.")
-    server.stop
-    info("Stopped HTTP server.")
+    if (server != null) {
+      debug("Stopping HTTP server.")
+      server.stop
+      info("Stopped HTTP server.")
+      debug("Stopping checkpoint manager.")
+      checkpointManager.stop()
+      info("Stopped checkpoint manager.")
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
index 10986a4..dfe3a45 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
@@ -20,8 +20,6 @@
 package org.apache.samza.coordinator.server;
 
 import java.net.InetAddress
-import java.net.URI
-import java.net.UnknownHostException
 import javax.servlet.Servlet
 import org.apache.samza.SamzaException
 import org.eclipse.jetty.server.Connector
@@ -32,6 +30,7 @@ import org.eclipse.jetty.servlet.ServletHolder
 import java.net.URL
 import org.apache.samza.util.Logging
 
+
 /**
  * <p>A Jetty-based HTTP server. The server allows arbitrary servlets to be added
  * with the addServlet() method. The server is configured to automatically

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index 635c353..a3baddb 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -19,12 +19,13 @@
 
 package org.apache.samza.coordinator.server
 
+
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.util.Logging
 
 /**
  * A servlet that dumps the job model for a Samza job.
  */
-class JobServlet(jobModel: JobModel) extends ServletBase with Logging {
-  protected def getObjectToWrite() = jobModel
-}
+class JobServlet(jobModelGenerator: () => JobModel) extends ServletBase with Logging {
+  protected def getObjectToWrite() = jobModelGenerator()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
new file mode 100644
index 0000000..9283812
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.{Config, SystemConfig}
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemFactory, SystemStream}
+import org.apache.samza.util.Util
+
+/**
+ * A helper class that does wiring for CoordinatorStreamSystemConsumer and
+ * CoordinatorStreamSystemProducer. This factory should only be used in
+ * situations where the underlying SystemConsumer/SystemProducer does not
+ * exist.
+ */
+class CoordinatorStreamSystemFactory {
+  def getCoordinatorStreamSystemConsumer(config: Config, registry: MetricsRegistry) = {
+    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+    val systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem, config, registry)
+    new CoordinatorStreamSystemConsumer(coordinatorSystemStream, systemConsumer, systemAdmin)
+  }
+
+  def getCoordinatorStreamSystemProducer(config: Config, registry: MetricsRegistry) = {
+    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+    val systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem, config, registry)
+    new CoordinatorStreamSystemProducer(coordinatorSystemStream, systemProducer, systemAdmin)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 0b720ec..1c178a6 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,16 +20,27 @@
 package org.apache.samza.job
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.util.Util
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+
+object JobRunner {
+  val SOURCE = "job-runner"
 
-object JobRunner extends Logging {
   def main(args: Array[String]) {
     val cmdline = new CommandLine
     val options = cmdline.parser.parse(args: _*)
@@ -43,23 +54,43 @@ object JobRunner extends Logging {
  * on a config URI. The configFactory is instantiated, fed the configPath,
  * and returns a Config, which is used to execute the job.
  */
-class JobRunner(config: Config) extends Logging with Runnable {
-
-  def run() {
-    val conf = rewriteConfig(config)
-
-    val jobFactoryClass = conf.getStreamJobFactoryClass match {
+class JobRunner(config: Config) extends Logging {
+  def run() = {
+    debug("config: %s" format (config))
+    val jobFactoryClass = config.getStreamJobFactoryClass match {
       case Some(factoryClass) => factoryClass
       case _ => throw new SamzaException("no job factory class defined")
     }
-
     val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
-
     info("job factory: %s" format (jobFactoryClass))
-    debug("config: %s" format (conf))
+    val factory = new CoordinatorStreamSystemFactory
+    val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+    val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+
+    // Create the coordinator stream if it doesn't exist
+    info("Creating coordinator stream");
+    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+    systemAdmin.createCoordinatorStream(coordinatorSystemStream.getStream)
+
+    info("Storing config in coordinator stream.")
+    coordinatorSystemProducer.register(JobRunner.SOURCE)
+    coordinatorSystemProducer.start
+    coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
+    info("Loading old config from coordinator stream.")
+    coordinatorSystemConsumer.register
+    coordinatorSystemConsumer.start
+    coordinatorSystemConsumer.bootstrap
+    coordinatorSystemConsumer.stop
+    val oldConfig = coordinatorSystemConsumer.getConfig();
+    info("Deleting old configs that are no longer defined: %s".format(oldConfig.keySet -- config.keySet))
+    (oldConfig.keySet -- config.keySet).foreach(key => {
+      coordinatorSystemProducer.send(new CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
+    })
+    coordinatorSystemProducer.stop
 
     // Create the actual job, and submit it.
-    val job = jobFactory.getJob(conf).submit
+    val job = jobFactory.getJob(config).submit
 
     info("waiting for job to start")
 
@@ -76,22 +107,6 @@ class JobRunner(config: Config) extends Logging with Runnable {
     }
 
     info("exiting")
-  }
-
-  // Apply any and all config re-writer classes that the user has specified
-  def rewriteConfig(config: Config): Config = {
-    def rewrite(c: Config, rewriterName: String): Config = {
-      val klass = config
-        .getConfigRewriterClass(rewriterName)
-        .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
-      val rewriter = Util.getObj[ConfigRewriter](klass)
-      info("Re-writing config file with " + rewriter)
-      rewriter.rewrite(rewriterName, c)
-    }
-
-    config.getConfigRewriters match {
-      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
-      case None => config
-    }
+    job
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fd9719a..4fac154 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -30,7 +30,7 @@ import org.apache.samza.util.{Logging, Util}
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
   def   getJob(config: Config): StreamJob = {
-    val coordinator = JobCoordinator(config, 1)
+    val coordinator = JobCoordinator(config)
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
     val commandBuilder = {
@@ -48,11 +48,12 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     }
     // JobCoordinator is stopped by ProcessJob when it exits
     coordinator.start
+    val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
 
     commandBuilder
-      .setConfig(config)
-      .setId(0)
-      .setUrl(coordinator.server.getUrl)
+            .setConfig(coordinatorSystemConfig)
+            .setId(0)
+            .setUrl(coordinator.server.getUrl)
 
     new ProcessJob(commandBuilder, coordinator)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 530255e..60ee36f 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.local
 
+
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.util.Logging
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
@@ -26,7 +28,6 @@ import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.config.TaskConfig._
 import org.apache.samza.container.SamzaContainer
 import org.apache.samza.job.{ StreamJob, StreamJobFactory }
-import org.apache.samza.util.Util
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.coordinator.JobCoordinator
 
@@ -36,7 +37,7 @@ import org.apache.samza.coordinator.JobCoordinator
 class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
-    val coordinator = JobCoordinator(config, 1)
+    val coordinator = JobCoordinator(config)
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
     // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
index 744eec0..4b658c1 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
@@ -18,19 +18,33 @@
  */
 
 package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.codehaus.jackson.`type`.TypeReference
 import org.codehaus.jackson.map.ObjectMapper
-import java.nio.ByteBuffer
 import org.apache.samza.config.Config
 
-class JsonSerde extends Serde[Object] {
-  val objectMapper = new ObjectMapper
+class JsonSerde[T] extends Serde[T] {
+  val mapper = SamzaObjectMapper.getObjectMapper()
+
+  def toBytes(obj: T): Array[Byte] = {
+    try {
+      mapper.writeValueAsString(obj).getBytes("UTF-8")
+    }
+    catch {
+      case e: Exception => throw new SamzaException(e);
+    }
+  }
 
-  def toBytes(obj: Object) = objectMapper
-    .writeValueAsString(obj)
-    .getBytes("UTF-8")
+  def fromBytes(bytes: Array[Byte]): T = {
+     try {
+         mapper.readValue(new String(bytes, "UTF-8"), new TypeReference[T]() {})}
+     catch {
+       case e: Exception => throw new SamzaException(e);
+     }
+  }
 
-  def fromBytes(bytes: Array[Byte]) = objectMapper
-    .readValue(bytes, classOf[Object])
 }
 
 class JsonSerdeFactory extends SerdeFactory[Object] {

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index ec1d749..097f410 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -131,7 +131,11 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
     enterPosition
   }
 
-  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-    throw new SamzaException("Method not implemented")
+  def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+    throw new UnsupportedOperationException("Method not implemented.")
+  }
+
+  def createCoordinatorStream(streamName: String) {
+    throw new UnsupportedOperationException("Method not implemented.")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 1a67586..8a83566 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,14 +19,20 @@
 
 package org.apache.samza.util
 
-import java.net.URL
-import java.io.BufferedReader
+import java.net.{HttpURLConnection, URL}
+import java.io.{InputStream, BufferedReader, File, InputStreamReader}
 import java.lang.management.ManagementFactory
-import java.io.File
-import org.apache.samza.system.SystemStream
+import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
 import java.util.Random
-import org.apache.samza.job.model.JobModel
-import java.io.InputStreamReader
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
 
 object Util extends Logging {
   val random = new Random
@@ -114,14 +120,121 @@ object Util extends Logging {
    * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
    * @return String payload of the body of the HTTP response.
    */
-  def read(url: URL, timeout: Int = 30000): String = {
-    val conn = url.openConnection();
+  def read(url: URL, timeout: Int = 60000): String = {
+    var httpConn = getHttpConnection(url, timeout)
+    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+    retryBackoff.run(loop => {
+      if(httpConn.getResponseCode != 200)
+      {
+        warn("Error: " + httpConn.getResponseCode)
+        val errorContent = readStream(httpConn.getErrorStream)
+        warn("Error reading stream, failed with response %s" format errorContent)
+        httpConn = getHttpConnection(url, timeout)
+      }
+      else
+      {
+        loop.done
+      }
+    },
+    (exception, loop) => {
+      exception match {
+        case e: Exception =>
+          loop.done
+          error("Unable to connect to Job coordinator server, received exception", e)
+          throw e
+      }
+    })
+
+    if(httpConn.getResponseCode != 200) {
+      throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
+    }
+    readStream(httpConn.getInputStream)
+  }
+
+  private def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+    val conn = url.openConnection()
     conn.setConnectTimeout(timeout)
     conn.setReadTimeout(timeout)
-    val br = new BufferedReader(new InputStreamReader(conn.getInputStream));
+    conn.asInstanceOf[HttpURLConnection]
+  }
+  private def readStream(stream: InputStream): String = {
+    val br = new BufferedReader(new InputStreamReader(stream));
     var line: String = null;
     val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
     br.close
+    stream.close
     body
   }
+
+
+  /**
+   * Generates a coordinator stream name based off of the job name and job id
+   * for the jobd. The format is of the stream name will be
+   * __samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
+   */
+  def getCoordinatorStreamName(jobName: String, jobId: String) = {
+    "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+  }
+
+  /**
+   * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
+   * defined in the config, and job name must be defined in config.
+   *
+   * @return A tuple of (jobName, jobId)
+   */
+  def getJobNameAndId(config: Config) = {
+    (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), config.getJobId.getOrElse("1"))
+  }
+
+  /**
+   * Given a job's full config object, build a subset config which includes
+   * only the job name, job id, and system config for the coordinator stream.
+   */
+  def buildCoordinatorStreamConfig(config: Config) = {
+    val (jobName, jobId) = getJobNameAndId(config)
+    // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
+    new MapConfig(config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
+      Map[String, String](JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName))
+  }
+
+  /**
+   * Get the Coordinator System and system factory from the configuration
+   * @param config
+   * @return
+   */
+  def getCoordinatorSystemStreamAndFactory(config: Config) = {
+    val systemName = config.getCoordinatorSystemName
+    val (jobName, jobId) = Util.getJobNameAndId(config)
+    val streamName = Util.getCoordinatorStreamName(jobName, jobId)
+    val coordinatorSystemStream = new SystemStream(systemName, streamName)
+    val systemFactoryClassName = config
+      .getSystemFactory(systemName)
+      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+    (coordinatorSystemStream, systemFactory)
+  }
+
+  /**
+   * The helper function converts a SSP to a string
+   * @param ssp System stream partition
+   * @return The string representation of the SSP
+   */
+  def sspToString(ssp: SystemStreamPartition): String = {
+     ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId())
+  }
+
+  /**
+   * The method converts the string SSP back to a SSP
+   * @param ssp The string form of the SSP
+   * @return An SSP typed object
+   */
+  def stringToSsp(ssp: String): SystemStreamPartition = {
+     val idx = ssp.indexOf('.');
+     val lastIdx = ssp.lastIndexOf('.')
+     if (idx < 0 || lastIdx < 0) {
+       throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition")
+     }
+     new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
+                               new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
new file mode 100644
index 0000000..59782fe
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.apache.samza.util.Util;
+
+/**
+ * Helper for creating mock CoordinatorStreamConsumer and
+ * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
+ * forward all configs to JobCoordinator, which is useful for mocking in unit
+ * tests.
+ */
+public class MockCoordinatorStreamSystemFactory implements SystemFactory {
+
+  private static SystemConsumer mockConsumer = null;
+  private static boolean useCachedConsumer = false;
+  public static void enableMockConsumerCache() {
+    mockConsumer = null;
+    useCachedConsumer = true;
+  }
+
+  public static void disableMockConsumerCache() {
+    useCachedConsumer = false;
+    mockConsumer = null;
+  }
+
+  /**
+   * Returns a consumer that sends all configs to the coordinator stream.
+   * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
+   *               The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
+   *                                       ch:source:taskname -> changelogPartition for changelog
+   *               Everything else is processed as normal config
+   */
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+
+    if(useCachedConsumer && mockConsumer != null) {
+      return mockConsumer;
+    }
+
+    String jobName = config.get("job.name");
+    String jobId = config.get("job.id");
+    if (jobName == null) {
+      throw new ConfigException("Must define job.name.");
+    }
+    if (jobId == null) {
+      jobId = "1";
+    }
+    String streamName = Util.getCoordinatorStreamName(jobName, jobId);
+    SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
+    mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
+    return mockConsumer;
+  }
+
+  /**
+   * Returns a no-op producer.
+   */
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    // A do-nothing producer.
+    return new SystemProducer() {
+      public void start() {
+      }
+
+      public void stop() {
+      }
+
+      public void register(String source) {
+      }
+
+      public void send(String source, OutgoingMessageEnvelope envelope) {
+      }
+
+      public void flush(String source) {
+      }
+    };
+  }
+
+  /**
+   * Returns a single partition admin that pretends to create a coordinator
+   * stream.
+   */
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new MockSystemAdmin();
+  }
+
+  public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+    public void createCoordinatorStream(String streamName) {
+      // Do nothing.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
new file mode 100644
index 0000000..00a2d59
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * A mock SystemConsumer that pretends to be a coordinator stream. The mock will
+ * take all configs given to it, and put them into the coordinator stream's
+ * SystemStreamPartition. This is useful in cases where config needs to be
+ * quickly passed from a unit test into the JobCoordinator.
+ */
+public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
+  private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
+  public final static String CHANGELOGPREFIX = "ch:";
+  public final static String CHECKPOINTPREFIX = "cp:";
+  public final CountDownLatch blockConsumerPoll = new CountDownLatch(1);
+  public boolean blockpollFlag = false;
+
+  private final SystemStreamPartition systemStreamPartition;
+  private final Config config;
+
+  public MockCoordinatorStreamWrappedConsumer(SystemStreamPartition systemStreamPartition, Config config) {
+    super();
+    this.config = config;
+    this.systemStreamPartition = systemStreamPartition;
+  }
+
+  public void start() {
+    convertConfigToCoordinatorMessage(config);
+  }
+
+  public void addMoreMessages(Config config) {
+    convertConfigToCoordinatorMessage(config);
+  }
+
+
+  private void convertConfigToCoordinatorMessage(Config config) {
+    try {
+      for (Map.Entry<String, String> configPair : config.entrySet()) {
+        byte[] keyBytes = null;
+        byte[] messgeBytes = null;
+        if(configPair.getKey().startsWith(CHECKPOINTPREFIX))
+        {
+          String[] checkpointInfo = configPair.getKey().split(":");
+          String[] sspOffsetPair = configPair.getValue().split(":");
+          HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
+          checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
+          Checkpoint cp = new Checkpoint(checkpointMap);
+          CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
+          keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
+          messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
+        }
+        else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
+          String[] changelogInfo = configPair.getKey().split(":");
+          String changeLogPartition = configPair.getValue();
+          CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
+          keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
+          messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
+        }
+        else {
+          SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue());
+          keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
+          messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
+        }
+        // The ssp here is the coordinator ssp (which is always fixed) and not the task ssp.
+        put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "", keyBytes, messgeBytes));
+      }
+      setIsAtHead(systemStreamPartition, true);
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+      throws InterruptedException {
+
+    if(blockpollFlag) {
+      blockConsumerPoll.await();
+    }
+
+    return super.poll(systemStreamPartitions, timeout);
+  }
+
+  public CountDownLatch blockPool()
+  {
+    blockpollFlag = true;
+    return blockConsumerPoll;
+  }
+
+
+  public void stop() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
new file mode 100644
index 0000000..15181bb
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.junit.Test;
+
+public class TestCoordinatorStreamMessage {
+  @Test
+  public void testCoordinatorStreamMessage() {
+    CoordinatorStreamMessage message = new CoordinatorStreamMessage("source");
+    assertEquals("source", message.getSource());
+    assertEquals(CoordinatorStreamMessage.VERSION, message.getVersion());
+    assertNotNull(message.getUsername());
+    assertTrue(message.getTimestamp() > 0);
+    assertTrue(!message.isDelete());
+    CoordinatorStreamMessage secondMessage = new CoordinatorStreamMessage(message.getKeyArray(), message.getMessageMap());
+    assertEquals(secondMessage, message);
+  }
+
+  @Test
+  public void testCoordinatorStreamMessageIsDelete() {
+    CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[] {}, null);
+    assertTrue(message.isDelete());
+    assertNull(message.getMessageMap());
+  }
+
+  @Test
+  public void testSetConfig() {
+    SetConfig setConfig = new SetConfig("source", "key", "value");
+    assertEquals(SetConfig.TYPE, setConfig.getType());
+    assertEquals("key", setConfig.getKey());
+    assertEquals("value", setConfig.getConfigValue());
+    assertFalse(setConfig.isDelete());
+    assertEquals(CoordinatorStreamMessage.VERSION, setConfig.getVersion());
+  }
+
+  @Test
+  public void testDelete() {
+    Delete delete = new Delete("source2", "key", "delete-type");
+    assertEquals("delete-type", delete.getType());
+    assertEquals("key", delete.getKey());
+    assertNull(delete.getMessageMap());
+    assertTrue(delete.isDelete());
+    assertEquals(CoordinatorStreamMessage.VERSION, delete.getVersion());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..5e193f8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemConsumer {
+  @Test
+  public void testCoordinatorStreamSystemConsumer() {
+    Map<String, String> expectedConfig = new HashMap<String, String>();
+    expectedConfig.put("job.id", "1234");
+    SystemStream systemStream = new SystemStream("system", "stream");
+    MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+    assertFalse(systemConsumer.isRegistered());
+    consumer.register();
+    assertTrue(systemConsumer.isRegistered());
+    assertFalse(systemConsumer.isStarted());
+    consumer.start();
+    assertTrue(systemConsumer.isStarted());
+    try {
+      consumer.getConfig();
+      fail("Should have failed when retrieving config before bootstrapping.");
+    } catch (SamzaException e) {
+      // Expected.
+    }
+    consumer.bootstrap();
+    assertEquals(expectedConfig, consumer.getConfig());
+    assertFalse(systemConsumer.isStopped());
+    consumer.stop();
+    assertTrue(systemConsumer.isStopped());
+  }
+
+  private static class MockSystemConsumer implements SystemConsumer {
+    private boolean started = false;
+    private boolean stopped = false;
+    private boolean registered = false;
+    private final SystemStreamPartition expectedSystemStreamPartition;
+    private int pollCount = 0;
+
+    public MockSystemConsumer(SystemStreamPartition expectedSystemStreamPartition) {
+      this.expectedSystemStreamPartition = expectedSystemStreamPartition;
+    }
+
+    public void start() {
+      started = true;
+    }
+
+    public void stop() {
+      stopped = true;
+    }
+
+    public void register(SystemStreamPartition systemStreamPartition, String offset) {
+      registered = true;
+      assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+    }
+
+    public boolean isRegistered() {
+      return registered;
+    }
+
+    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+      assertEquals(1, systemStreamPartitions.size());
+      SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
+      assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+
+      if (pollCount++ == 0) {
+        List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+        SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
+        SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
+        Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
+        list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
+        list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
+        list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
+        map.put(systemStreamPartition, list);
+      }
+
+      return map;
+    }
+
+    private byte[] serialize(Object obj) {
+      try {
+        return SamzaObjectMapper.getObjectMapper().writeValueAsString(obj).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    public boolean isStarted() {
+      return started;
+    }
+
+    public boolean isStopped() {
+      return stopped;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..728fa53
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemProducer {
+  @Test
+  public void testCoordinatorStreamSystemProducer() {
+    String source = "source";
+    SystemStream systemStream = new SystemStream("system", "stream");
+    MockSystemProducer systemProducer = new MockSystemProducer(source);
+    MockSystemAdmin systemAdmin = new MockSystemAdmin();
+    CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
+    SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name");
+    SetConfig setConfig2 = new SetConfig(source, "job.id", "1234");
+    Delete delete = new Delete(source, "job.name", SetConfig.TYPE);
+    assertFalse(systemProducer.isRegistered());
+    producer.register(source);
+    assertTrue(systemProducer.isRegistered());
+    assertFalse(systemProducer.isStarted());
+    producer.start();
+    assertTrue(systemProducer.isStarted());
+    producer.send(setConfig1);
+    producer.send(setConfig2);
+    producer.send(delete);
+    assertFalse(systemProducer.isStopped());
+    producer.stop();
+    assertTrue(systemProducer.isStopped());
+    List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
+    OutgoingMessageEnvelope envelope0 = envelopes.get(0);
+    OutgoingMessageEnvelope envelope1 = envelopes.get(1);
+    OutgoingMessageEnvelope envelope2 = envelopes.get(2);
+    TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
+    };
+    TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
+    };
+    assertEquals(3, envelopes.size());
+    assertEquals(new CoordinatorStreamMessage(setConfig1), new CoordinatorStreamMessage(deserialize((byte[]) envelope0.getKey(), keyRef), deserialize((byte[]) envelope0.getMessage(), msgRef)));
+    assertEquals(new CoordinatorStreamMessage(setConfig2), new CoordinatorStreamMessage(deserialize((byte[]) envelope1.getKey(), keyRef), deserialize((byte[]) envelope1.getMessage(), msgRef)));
+    assertEquals(new CoordinatorStreamMessage(delete), new CoordinatorStreamMessage(deserialize((byte[]) envelope2.getKey(), keyRef), deserialize((byte[]) envelope2.getMessage(), msgRef)));
+  }
+
+  private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
+    try {
+      if (bytes != null) {
+        String valueStr = new String((byte[]) bytes, "UTF-8");
+        return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
+      }
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+
+    return null;
+  }
+
+  private static class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+    public void createCoordinatorStream(String streamName) {
+      // Do nothing.
+    }
+  }
+
+  private static class MockSystemProducer implements SystemProducer {
+    private final String expectedSource;
+    private final List<OutgoingMessageEnvelope> envelopes;
+    private boolean started = false;
+    private boolean stopped = false;
+    private boolean registered = false;
+    private boolean flushed = false;
+
+    public MockSystemProducer(String expectedSource) {
+      this.expectedSource = expectedSource;
+      this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+    }
+
+    public void start() {
+      started = true;
+    }
+
+    public void stop() {
+      stopped = true;
+    }
+
+    public void register(String source) {
+      assertEquals(expectedSource, source);
+      registered = true;
+    }
+
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      envelopes.add(envelope);
+    }
+
+    public void flush(String source) {
+      assertEquals(expectedSource, source);
+      flushed = true;
+    }
+
+    public List<OutgoingMessageEnvelope> getEnvelopes() {
+      return envelopes;
+    }
+
+    public boolean isStarted() {
+      return started;
+    }
+
+    public boolean isStopped() {
+      return stopped;
+    }
+
+    public boolean isRegistered() {
+      return registered;
+    }
+
+    public boolean isFlushed() {
+      return flushed;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 76bc681..72b134c 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -40,12 +40,12 @@ public class TestSamzaObjectMapper {
   public void testJsonTaskModel() throws Exception {
     ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
     Map<String, String> configMap = new HashMap<String, String>();
+    Map<SystemStreamPartition, String> sspOffset = new HashMap<SystemStreamPartition, String>();
     configMap.put("a", "b");
     Config config = new MapConfig(configMap);
-    Set<SystemStreamPartition> inputSystemStreamPartitions = new HashSet<SystemStreamPartition>();
-    inputSystemStreamPartitions.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
     TaskName taskName = new TaskName("test");
-    TaskModel taskModel = new TaskModel(taskName, inputSystemStreamPartitions, new Partition(2));
+    sspOffset.put(new SystemStreamPartition("foo", "bar", new Partition(1)), "");
+    TaskModel taskModel = new TaskModel(taskName, sspOffset, new Partition(2));
     Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
     tasks.put(taskName, taskModel);
     ContainerModel containerModel = new ContainerModel(1, tasks);

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
index 9348c7d..41eb82e 100644
--- a/samza-core/src/test/resources/test.properties
+++ b/samza-core/src/test/resources/test.properties
@@ -22,3 +22,4 @@
 job.factory.class=org.apache.samza.job.MockJobFactory
 job.name=test-job
 foo=bar
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index af800df..0ba932c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -31,8 +31,9 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mock.MockitoSugar
-
 import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = null
@@ -40,8 +41,8 @@ object TestCheckpointTool {
   var systemProducer: SystemProducer = null
   var systemAdmin: SystemAdmin = null
 
-  class MockCheckpointManagerFactory extends CheckpointManagerFactory {
-    override def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
+  class MockCheckpointManagerFactory {
+    def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
   }
 
   class MockSystemFactory extends SystemFactory {
@@ -62,15 +63,17 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
   @Before
   def setup {
     config = new MapConfig(Map(
+      JobConfig.JOB_NAME -> "test",
+      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
       TaskConfig.INPUT_STREAMS -> "test.foo",
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName
+      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
     ))
     val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
       new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
       new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
     ))
-
     TestCheckpointTool.checkpointManager = mock[CheckpointManager]
     TestCheckpointTool.systemAdmin = mock[SystemAdmin]
     when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
@@ -79,12 +82,12 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
       .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234")))
     when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
       .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321")))
-
   }
 
   @Test
   def testReadLatestCheckpoint {
-    new CheckpointTool(config, null).run
+    val checkpointTool = new CheckpointTool(config, null, TestCheckpointTool.checkpointManager)
+    checkpointTool.run
     verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
     verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
     verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any())
@@ -95,7 +98,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
     val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
       tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
 
-    new CheckpointTool(config, toOverwrite).run
+    val checkpointTool = new CheckpointTool(config, toOverwrite, TestCheckpointTool.checkpointManager)
+    checkpointTool.run
     verify(TestCheckpointTool.checkpointManager)
       .writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42")))
     verify(TestCheckpointTool.checkpointManager)

http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index a281e79..8d54c46 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -65,7 +65,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
@@ -97,7 +97,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
@@ -179,22 +179,6 @@ class TestOffsetManager {
     }
   }
 
-  @Ignore("OffsetManager.start is supposed to throw an exception - but it doesn't") @Test
-  def testShouldFailWhenMissingDefault {
-    val taskName = new TaskName("c")
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
-    offsetManager.register(taskName, Set(systemStreamPartition))
-
-    intercept[SamzaException] {
-      offsetManager.start
-    }
-  }
-
   @Test
   def testDefaultSystemShouldFailWhenFailIsSpecified {
     val systemStream = new SystemStream("test-system", "test-stream")
@@ -255,24 +239,26 @@ class TestOffsetManager {
     assertEquals(Some("13"), offsetManager.getStartingOffset(ssp))
   }
 
+
   private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
     val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
-
-    new CheckpointManager {
+    new CheckpointManager(null, null, null) {
       var isStarted = false
       var isStopped = false
       var registered = Set[TaskName]()
       var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
       var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
-      def start { isStarted = true }
-      def register(taskName: TaskName) { registered += taskName }
-      def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
-      def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
-      def stop { isStopped = true }
-
-      override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
+      override def start { isStarted = true }
+      override def register(taskName: TaskName) { registered += taskName }
+      override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
+      override def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
+      override def stop { isStopped = true }
 
-      override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
+      // Only for testing purposes - not present in actual checkpoint manager
+      def getOffets: util.Map[SystemStreamPartition, String] =
+      {
+        checkpoint.getOffsets()
+      }
     }
   }
 
@@ -284,8 +270,12 @@ class TestOffsetManager {
       def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
         Map[String, SystemStreamMetadata]()
 
-      override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
-        new SamzaException("Method not implemented")
+      override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+        new UnsupportedOperationException("Method not implemented.")
+      }
+
+      override def createCoordinatorStream(streamName: String) {
+        new UnsupportedOperationException("Method not implemented.")
       }
     }
   }