You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/28 22:08:38 UTC

[2/2] git commit: SAMZA-444; provide a samza job data model for job coordinator

SAMZA-444; provide a samza job data model for job coordinator


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/6f595bed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/6f595bed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/6f595bed

Branch: refs/heads/master
Commit: 6f595beda2482ab85c47f40ae8345c2591007367
Parents: f6d3415
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Oct 28 09:04:14 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Oct 28 09:04:14 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/job/model/ContainerModel.java  |  92 ++++++++
 .../org/apache/samza/job/model/JobModel.java    |  90 ++++++++
 .../org/apache/samza/job/model/TaskModel.java   | 108 +++++++++
 .../model/JsonContainerModelMixIn.java          |  42 ++++
 .../serializers/model/JsonJobModelMixIn.java    |  41 ++++
 .../serializers/model/JsonTaskModelMixIn.java   |  45 ++++
 .../serializers/model/SamzaObjectMapper.java    | 198 ++++++++++++++++
 .../samza/checkpoint/CheckpointTool.scala       |   9 +-
 .../serializers/JsonConfigSerializer.scala      |  39 ----
 .../apache/samza/container/SamzaContainer.scala |  75 ++++---
 .../TaskNamesToSystemStreamPartitions.scala     | 145 ------------
 .../grouper/task/GroupByContainerCount.scala    |  47 ++--
 .../grouper/task/TaskNameGrouper.scala          |  35 ++-
 .../samza/coordinator/JobCoordinator.scala      | 225 +++++++++++++++++++
 .../samza/coordinator/server/HttpServer.scala   |  57 ++++-
 .../samza/coordinator/server/JobServlet.scala   |  50 +----
 .../samza/coordinator/server/ServletBase.scala  |  26 ++-
 .../org/apache/samza/job/local/ProcessJob.scala |  13 +-
 .../samza/job/local/ProcessJobFactory.scala     |  60 +++--
 .../samza/job/local/ThreadJobFactory.scala      |  30 +--
 .../org/apache/samza/util/JsonHelpers.scala     |  93 --------
 .../main/scala/org/apache/samza/util/Util.scala | 225 ++-----------------
 .../model/TestSamzaObjectMapper.java            |  59 +++++
 .../samza/container/TestSamzaContainer.scala    |  64 +++---
 .../TestTaskNamesToSystemStreamPartitions.scala |  71 ------
 .../task/TestGroupByContainerCount.scala        |  65 ++++--
 .../samza/coordinator/TestJobCoordinator.scala  | 119 ++++++++++
 .../scala/org/apache/samza/util/TestUtil.scala  |  70 +-----
 .../resources/scalate/WEB-INF/views/index.scaml |  10 +-
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |   7 +-
 .../samza/job/yarn/SamzaAppMasterService.scala  |  12 +-
 .../samza/job/yarn/SamzaAppMasterState.scala    |   7 +-
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  15 +-
 .../org/apache/samza/job/yarn/YarnJob.scala     |   4 +-
 .../webapp/ApplicationMasterRestServlet.scala   |  29 ++-
 .../job/yarn/TestSamzaAppMasterService.scala    |  16 +-
 .../yarn/TestSamzaAppMasterTaskManager.scala    |  36 +--
 37 files changed, 1393 insertions(+), 936 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
new file mode 100644
index 0000000..98a34bc
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.samza.container.TaskName;
+
+/**
+ * <p>
+ * The data model is used to define which TaskModels a SamzaContainer should
+ * process. The model is used in the job coordinator and SamzaContainer to
+ * determine how to execute Samza jobs.
+ * </p>
+ * 
+ * <p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and
+ * containers have tasks. Each data model contains relevant information, such as
+ * an id, partition information, etc.
+ * </p>
+ */
+public class ContainerModel implements Comparable<ContainerModel> {
+  private final int containerId;
+  private final Map<TaskName, TaskModel> tasks;
+
+  public ContainerModel(int containerId, Map<TaskName, TaskModel> tasks) {
+    this.containerId = containerId;
+    this.tasks = Collections.unmodifiableMap(tasks);
+  }
+
+  public int getContainerId() {
+    return containerId;
+  }
+
+  public Map<TaskName, TaskModel> getTasks() {
+    return tasks;
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerModel [containerId=" + containerId + ", tasks=" + tasks + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + containerId;
+    result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ContainerModel other = (ContainerModel) obj;
+    if (containerId != other.containerId)
+      return false;
+    if (tasks == null) {
+      if (other.tasks != null)
+        return false;
+    } else if (!tasks.equals(other.tasks))
+      return false;
+    return true;
+  }
+
+  public int compareTo(ContainerModel other) {
+    return containerId - other.getContainerId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
new file mode 100644
index 0000000..c2b49c4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.samza.config.Config;
+
+/**
+ * <p>
+ * The data model used to represent a Samza job. The model is used in the job
+ * coordinator and SamzaContainer to determine how to execute Samza jobs.
+ * </p>
+ * 
+ * <p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and
+ * containers have tasks. Each data model contains relevant information, such as
+ * an id, partition information, etc.
+ * </p>
+ */
+public class JobModel {
+  private final Config config;
+  private final Map<Integer, ContainerModel> containers;
+
+  public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+    this.config = config;
+    this.containers = Collections.unmodifiableMap(containers);
+  }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public Map<Integer, ContainerModel> getContainers() {
+    return containers;
+  }
+
+  @Override
+  public String toString() {
+    return "JobModel [config=" + config + ", containers=" + containers + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((config == null) ? 0 : config.hashCode());
+    result = prime * result + ((containers == null) ? 0 : containers.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    JobModel other = (JobModel) obj;
+    if (config == null) {
+      if (other.config != null)
+        return false;
+    } else if (!config.equals(other.config))
+      return false;
+    if (containers == null) {
+      if (other.containers != null)
+        return false;
+    } else if (!containers.equals(other.containers))
+      return false;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
new file mode 100644
index 0000000..eb22d2e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * <p>
+ * The data model used to represent a task. The model is used in the job
+ * coordinator and SamzaContainer to determine how to execute Samza jobs.
+ * </p>
+ * 
+ * <p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and
+ * containers have tasks. Each data model contains relevant information, such as
+ * an id, partition information, etc.
+ * </p>
+ */
+public class TaskModel implements Comparable<TaskModel> {
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> systemStreamPartitions;
+  private final Partition changelogPartition;
+
+  public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+    this.taskName = taskName;
+    this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
+    this.changelogPartition = changelogPartition;
+  }
+
+  public TaskName getTaskName() {
+    return taskName;
+  }
+
+  public Set<SystemStreamPartition> getSystemStreamPartitions() {
+    return systemStreamPartitions;
+  }
+
+  public Partition getChangelogPartition() {
+    return changelogPartition;
+  }
+
+  @Override
+  public String toString() {
+    return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((changelogPartition == null) ? 0 : changelogPartition.hashCode());
+    result = prime * result + ((systemStreamPartitions == null) ? 0 : systemStreamPartitions.hashCode());
+    result = prime * result + ((taskName == null) ? 0 : taskName.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TaskModel other = (TaskModel) obj;
+    if (changelogPartition == null) {
+      if (other.changelogPartition != null)
+        return false;
+    } else if (!changelogPartition.equals(other.changelogPartition))
+      return false;
+    if (systemStreamPartitions == null) {
+      if (other.systemStreamPartitions != null)
+        return false;
+    } else if (!systemStreamPartitions.equals(other.systemStreamPartitions))
+      return false;
+    if (taskName == null) {
+      if (other.taskName != null)
+        return false;
+    } else if (!taskName.equals(other.taskName))
+      return false;
+    return true;
+  }
+
+  public int compareTo(TaskModel other) {
+    return taskName.compareTo(other.getTaskName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
new file mode 100644
index 0000000..f197a95
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.util.Map;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskModel;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
+ */
+public abstract class JsonContainerModelMixIn {
+  @JsonCreator
+  public JsonContainerModelMixIn(@JsonProperty("container-id") int containerId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
+  }
+
+  @JsonProperty("container-id")
+  abstract int getContainerId();
+
+  @JsonProperty("tasks")
+  abstract Map<TaskName, TaskModel> getTasks();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
new file mode 100644
index 0000000..037b5e2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.ContainerModel;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert Samza's JobModel to/from JSON.
+ */
+public abstract class JsonJobModelMixIn {
+  @JsonCreator
+  public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<Integer, ContainerModel> containers) {
+  }
+
+  @JsonProperty("config")
+  abstract Config getConfig();
+
+  @JsonProperty("containers")
+  abstract Map<Integer, ContainerModel> getContainers();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
new file mode 100644
index 0000000..7dc431c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert Samza's TaskModel to/from JSON.
+ */
+public abstract class JsonTaskModelMixIn {
+  @JsonCreator
+  public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
+  }
+
+  @JsonProperty("task-name")
+  abstract TaskName getTaskName();
+
+  @JsonProperty("system-stream-partitions")
+  abstract Set<SystemStreamPartition> getSystemStreamPartitions();
+
+  @JsonProperty("changelog-partition")
+  abstract Partition getChangelogPartition();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
new file mode 100644
index 0000000..3517912
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.ObjectCodec;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.DeserializationContext;
+import org.codehaus.jackson.map.JsonDeserializer;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.MapperConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.introspect.AnnotatedField;
+import org.codehaus.jackson.map.introspect.AnnotatedMethod;
+import org.codehaus.jackson.map.module.SimpleModule;
+import org.codehaus.jackson.type.TypeReference;
+
+/**
+ * <p>
+ * A collection of utility classes and (de)serializers to make Samza's job model
+ * work with Jackson. Rather than annotating Samza's job model directly, the
+ * Jackson-specific code is isolated so that Samza's core data model does not
+ * require a direct dependency on Jackson.
+ * </p>
+ * 
+ * <p>
+ * To use Samza's job data model, use the SamzaObjectMapper.getObjectMapper()
+ * method.
+ * </p>
+ */
+public class SamzaObjectMapper {
+  private static final ObjectMapper OBJECT_MAPPER = getObjectMapper();
+
+  /**
+   * @return Returns a new ObjectMapper that's been configured to (de)serialize
+   *         Samza's job data model, and simple data types such as TaskName,
+   *         Partition, Config, and SystemStreamPartition.
+   */
+  public static ObjectMapper getObjectMapper() {
+    ObjectMapper mapper = new ObjectMapper();
+    SimpleModule module = new SimpleModule("SamzaModule", new Version(1, 0, 0, ""));
+
+    // Setup custom serdes for simple data types.
+    module.addSerializer(Partition.class, new PartitionSerializer());
+    module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
+    module.addSerializer(TaskName.class, new TaskNameSerializer());
+    module.addDeserializer(Partition.class, new PartitionDeserializer());
+    module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
+    module.addDeserializer(Config.class, new ConfigDeserializer());
+
+    // Setup mixins for data models.
+    mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
+    mapper.getSerializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
+    mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+
+    // Convert camel case to hyphenated field names, and register the module.
+    mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
+    mapper.registerModule(module);
+
+    return mapper;
+  }
+
+  public static class ConfigDeserializer extends JsonDeserializer<Config> {
+    @Override
+    public Config deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+      ObjectCodec oc = jsonParser.getCodec();
+      JsonNode node = oc.readTree(jsonParser);
+      return new MapConfig(OBJECT_MAPPER.<Map<String, String>> readValue(node, new TypeReference<Map<String, String>>() {
+      }));
+    }
+  }
+
+  public static class PartitionSerializer extends JsonSerializer<Partition> {
+    @Override
+    public void serialize(Partition partition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+      jsonGenerator.writeObject(Integer.valueOf(partition.getPartitionId()));
+    }
+  }
+
+  public static class PartitionDeserializer extends JsonDeserializer<Partition> {
+    @Override
+    public Partition deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+      ObjectCodec oc = jsonParser.getCodec();
+      JsonNode node = oc.readTree(jsonParser);
+      return new Partition(node.getIntValue());
+    }
+  }
+
+  public static class TaskNameSerializer extends JsonSerializer<TaskName> {
+    @Override
+    public void serialize(TaskName taskName, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+      jsonGenerator.writeObject(taskName.toString());
+    }
+  }
+
+  public static class TaskNameDeserializer extends JsonDeserializer<TaskName> {
+    @Override
+    public TaskName deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+      ObjectCodec oc = jsonParser.getCodec();
+      JsonNode node = oc.readTree(jsonParser);
+      return new TaskName(node.getTextValue());
+    }
+  }
+
+  public static class SystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> {
+    @Override
+    public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+      Map<String, Object> systemStreamPartitionMap = new HashMap<String, Object>();
+      systemStreamPartitionMap.put("system", systemStreamPartition.getSystem());
+      systemStreamPartitionMap.put("stream", systemStreamPartition.getStream());
+      systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition());
+      jsonGenerator.writeObject(systemStreamPartitionMap);
+    }
+  }
+
+  public static class SystemStreamPartitionDeserializer extends JsonDeserializer<SystemStreamPartition> {
+    @Override
+    public SystemStreamPartition deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+      ObjectCodec oc = jsonParser.getCodec();
+      JsonNode node = oc.readTree(jsonParser);
+      String system = node.get("system").getTextValue();
+      String stream = node.get("stream").getTextValue();
+      Partition partition = new Partition(node.get("partition").getIntValue());
+      return new SystemStreamPartition(system, stream, partition);
+    }
+  }
+
+  /**
+   * A Jackson property naming strategy that converts camel case JSON fields to
+   * hyphenated names. For example, myVariableName would be converted to
+   * my-variable-name.
+   */
+  public static class CamelCaseToDashesStrategy extends PropertyNamingStrategy {
+    @Override
+    public String nameForField(MapperConfig<?> config, AnnotatedField field, String defaultName) {
+      return convert(defaultName);
+    }
+
+    @Override
+    public String nameForGetterMethod(MapperConfig<?> config, AnnotatedMethod method, String defaultName) {
+      return convert(defaultName);
+    }
+
+    @Override
+    public String nameForSetterMethod(MapperConfig<?> config, AnnotatedMethod method, String defaultName) {
+      return convert(defaultName);
+    }
+
+    public String convert(String defaultName) {
+      StringBuilder builder = new StringBuilder();
+      char[] arr = defaultName.toCharArray();
+      for (int i = 0; i < arr.length; ++i) {
+        if (Character.isUpperCase(arr[i])) {
+          builder.append("-" + Character.toLowerCase(arr[i]));
+        } else {
+          builder.append(arr[i]);
+        }
+      }
+      return builder.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 64a5078..ddc30af 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -32,6 +32,7 @@ import org.apache.samza.util.{CommandLine, Util}
 import org.apache.samza.{Partition, SamzaException}
 import scala.collection.JavaConversions._
 import org.apache.samza.util.Logging
+import org.apache.samza.coordinator.JobCoordinator
 
 /**
  * Command-line tool for inspecting and manipulating the checkpoints for a job.
@@ -136,7 +137,13 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extend
     info("Using %s" format manager)
 
     // Find all the TaskNames that would be generated for this job config
-    val taskNames = Util.assignContainerToSSPTaskNames(config, 1).get(0).get.keys.toSet
+    val coordinator = JobCoordinator(config, 1)
+    val taskNames = coordinator
+      .jobModel
+      .getContainers
+      .values
+      .flatMap(_.getTasks.keys)
+      .toSet
 
     taskNames.foreach(manager.register)
     manager.start

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala b/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
deleted file mode 100644
index 60e65ea..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config.serializers
-import scala.collection.JavaConversions._
-
-import org.codehaus.jackson.map.ObjectMapper
-
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-
-import java.util.HashMap
-
-object JsonConfigSerializer {
-  val jsonMapper = new ObjectMapper()
-
-  def fromJson(string: String): Config = {
-    val map = jsonMapper.readValue(string, classOf[HashMap[String, String]])
-    new MapConfig(map)
-  }
-
-  def toJson(config: Config) = jsonMapper.writeValueAsString(new HashMap[String, String](config))
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d0c9004..5885a88 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -31,7 +31,6 @@ import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.serializers.JsonConfigSerializer
 import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -60,9 +59,12 @@ import org.apache.samza.task.TaskLifecycleListenerFactory
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
-import org.apache.samza.util.JsonHelpers
 import java.net.URL
 import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.job.model.JobModel
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -76,9 +78,11 @@ object SamzaContainer extends Logging {
     try {
       val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
       val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
-      val (config, sspTaskNames, taskNameToChangeLogPartitionMapping) = getCoordinatorObjects(coordinatorUrl)
+      val jobModel = readJobModel(coordinatorUrl)
+      val containerModel = jobModel.getContainers()(containerId.toInt)
+      val config = jobModel.getConfig
 
-      SamzaContainer(containerId, sspTaskNames(containerId), taskNameToChangeLogPartitionMapping, config).run
+      SamzaContainer(containerModel, config).run
     } finally {
       jmxServer.stop
     }
@@ -89,34 +93,41 @@ object SamzaContainer extends Logging {
    * assignments, and returns objects to be used for SamzaContainer's
    * constructor.
    */
-  def getCoordinatorObjects(coordinatorUrl: String) = {
-    info("Fetching configuration from: %s" format coordinatorUrl)
-    val rawCoordinatorObjects = JsonHelpers.deserializeCoordinatorBody(Util.read(new URL(coordinatorUrl)))
-    val rawConfig = rawCoordinatorObjects.get(JobServlet.CONFIG).asInstanceOf[java.util.Map[String, String]]
-    val rawContainers = rawCoordinatorObjects.get(JobServlet.CONTAINERS).asInstanceOf[java.util.Map[String, java.util.Map[String, java.util.List[java.util.Map[String, Object]]]]]
-    val rawTaskChangelogMapping = rawCoordinatorObjects.get(JobServlet.TASK_CHANGELOG_MAPPING).asInstanceOf[java.util.Map[String, java.lang.Integer]]
-    val config = JsonHelpers.convertCoordinatorConfig(rawConfig)
-    val sspTaskNames = JsonHelpers.convertCoordinatorSSPTaskNames(rawContainers)
-    val taskNameToChangeLogPartitionMapping = JsonHelpers.convertCoordinatorTaskNameChangelogPartitions(rawTaskChangelogMapping)
-    (config, sspTaskNames, taskNameToChangeLogPartitionMapping)
+  def readJobModel(url: String) = {
+    info("Fetching configuration from: %s" format url)
+    SamzaObjectMapper
+      .getObjectMapper
+      .readValue(Util.read(new URL(url)), classOf[JobModel])
   }
 
-  def apply(containerId: Int, sspTaskNames: TaskNamesToSystemStreamPartitions, taskNameToChangeLogPartitionMapping: Map[TaskName, Int], config: Config) = {
+  def apply(containerModel: ContainerModel, config: Config) = {
+    val containerId = containerModel.getContainerId
     val containerName = "samza-container-%s" format containerId
     val containerPID = Util.getContainerPID
 
     info("Setting up Samza container: %s" format containerName)
     info("Samza container PID: %s" format containerPID)
     info("Using configuration: %s" format config)
-    info("Using tasks: %s" format sspTaskNames)
-    info("Using task changelogs: %s" format taskNameToChangeLogPartitionMapping)
+    info("Using container model: %s" format containerModel)
 
     val registry = new MetricsRegistryMap(containerName)
     val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry)
     val systemProducersMetrics = new SystemProducersMetrics(registry)
     val systemConsumersMetrics = new SystemConsumersMetrics(registry)
 
-    val inputSystems = sspTaskNames.getAllSystems()
+    val inputSystemStreamPartitions = containerModel
+      .getTasks
+      .values
+      .flatMap(_.getSystemStreamPartitions)
+      .toSet
+
+    val inputSystemStreams = inputSystemStreamPartitions
+      .map(_.getSystemStream)
+      .toSet
+
+    val inputSystems = inputSystemStreams
+      .map(_.getSystem)
+      .toSet
 
     val systemNames = config.getSystemNames
 
@@ -144,7 +155,7 @@ object SamzaContainer extends Logging {
     info("Got system factories: %s" format systemFactories.keys)
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
-    val inputStreamMetadata = streamMetadataCache.getStreamMetadata(sspTaskNames.getAllSystemStreams)
+    val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams)
 
     info("Got input stream metadata: %s" format inputStreamMetadata)
 
@@ -213,7 +224,7 @@ object SamzaContainer extends Logging {
      * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps.
      */
     val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => {
-      (serdeStreams ++ sspTaskNames.getAllSSPs())
+      (serdeStreams ++ inputSystemStreamPartitions)
         .filter(systemStream => getSerdeName(systemStream).isDefined)
         .map(systemStream => {
           val serdeName = getSerdeName(systemStream).get
@@ -380,12 +391,18 @@ object SamzaContainer extends Logging {
 
     // Wire up all task-instance-level (unshared) objects.
 
-    val taskNames = sspTaskNames.keys.toSet
+    val taskNames = containerModel
+      .getTasks
+      .values
+      .map(_.getTaskName)
+      .toSet
 
     val containerContext = new SamzaContainerContext(containerId, config, taskNames)
 
-    val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => {
-      debug("Setting up task instance: %s" format taskName)
+    val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
+      debug("Setting up task instance: %s" format taskModel)
+
+      val taskName = taskModel.getTaskName
 
       val task = Util.getObj[StreamTask](taskClassName)
 
@@ -404,13 +421,11 @@ object SamzaContainer extends Logging {
 
       info("Got store consumers: %s" format storeConsumers)
 
-      val partitionForThisTaskName = new Partition(taskNameToChangeLogPartitionMapping(taskName))
-
       val taskStores = storageEngineFactories
         .map {
           case (storeName, storageEngineFactory) =>
             val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) {
-              new SystemStreamPartition(changeLogSystemStreams(storeName), partitionForThisTaskName)
+              new SystemStreamPartition(changeLogSystemStreams(storeName), taskModel.getChangelogPartition)
             } else {
               null
             }
@@ -437,7 +452,7 @@ object SamzaContainer extends Logging {
 
       info("Got task stores: %s" format taskStores)
 
-      val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, changeLogMetadata)
+      val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(taskModel.getChangelogPartition, changeLogMetadata)
 
       info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
 
@@ -448,9 +463,11 @@ object SamzaContainer extends Logging {
         changeLogSystemStreams = changeLogSystemStreams,
         changeLogOldestOffsets = changeLogOldestOffsets,
         storeBaseDir = storeBaseDir,
-        partitionForThisTaskName)
+        partition = taskModel.getChangelogPartition)
 
-      val systemStreamPartitions: Set[SystemStreamPartition] = sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName " + taskName + " in map of SystemStreamPartitions: " + sspTaskNames))
+      val systemStreamPartitions = taskModel
+        .getSystemStreamPartitions
+        .toSet
 
       info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName)
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
deleted file mode 100644
index da15346..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container
-
-import org.apache.samza.util.Logging
-import org.apache.samza.SamzaException
-import org.apache.samza.system.{SystemStream, SystemStreamPartition}
-import scala.collection.{immutable, Map, MapLike}
-
-/**
- * Map of {@link TaskName} to its set of {@link SystemStreamPartition}s with additional methods for aggregating
- * those SystemStreamPartitions' individual system, streams and partitions.  Is useful for highlighting this
- * particular, heavily used map within the code.
- *
- * @param m Original map of TaskNames to SystemStreamPartitions
- */
-class TaskNamesToSystemStreamPartitions(m:Map[TaskName, Set[SystemStreamPartition]] = Map[TaskName, Set[SystemStreamPartition]]())
-  extends Map[TaskName, Set[SystemStreamPartition]]
-  with MapLike[TaskName, Set[SystemStreamPartition], TaskNamesToSystemStreamPartitions] with Logging {
-
-  // Constructor
-  validate
-
-  // Methods
-
-  // TODO: Get rid of public constructor, rely entirely on the companion object
-  override def -(key: TaskName): TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions(m - key)
-
-  override def +[B1 >: Set[SystemStreamPartition]](kv: (TaskName, B1)): Map[TaskName, B1] = new TaskNamesToSystemStreamPartitions(m + kv.asInstanceOf[(TaskName, Set[SystemStreamPartition])])
-
-  override def iterator: Iterator[(TaskName, Set[SystemStreamPartition])] = m.iterator
-
-  override def get(key: TaskName): Option[Set[SystemStreamPartition]] = m.get(key)
-
-  override def empty: TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions()
-
-  override def seq: Map[TaskName, Set[SystemStreamPartition]] = m.seq
-
-  override def foreach[U](f: ((TaskName, Set[SystemStreamPartition])) => U): Unit = m.foreach(f)
-
-  override def size: Int = m.size
-
-  /**
-   * Validate that this is a legal mapping of TaskNames to SystemStreamPartitions.  At the moment,
-   * we only check that an SSP is included in the mapping at most once.  We could add other,
-   * pluggable validations here, or if we decided to allow an SSP to appear in the mapping more than
-   * once, remove this limitation.
-   */
-  def validate():Unit = {
-    // Convert sets of SSPs to lists, to preserve duplicates
-    val allSSPs: List[SystemStreamPartition] = m.values.toList.map(_.toList).flatten
-    val sspCountMap = allSSPs.groupBy(ssp => ssp)  // Group all the SSPs together
-      .map(ssp => (ssp._1 -> ssp._2.size))         // Turn into map -> count of that SSP
-      .filter(ssp => ssp._2 != 1)                  // Filter out those that appear once
-
-    if(!sspCountMap.isEmpty) {
-      throw new SamzaException("Assigning the same SystemStreamPartition to multiple TaskNames is not currently supported." +
-        "  Out of compliance SystemStreamPartitions and counts: " + sspCountMap)
-    }
-
-    debug("Successfully validated TaskName to SystemStreamPartition set mapping:" + m)
-  }
-
-  /**
-   * Return a set of all the SystemStreamPartitions for all the keys.
-   *
-   * @return All SystemStreamPartitions within this map
-   */
-  def getAllSSPs(): Iterable[SystemStreamPartition] = m.values.flatten
-
-  /**
-   * Return a set of all the Systems presents in the SystemStreamPartitions across all the keys
-   *
-   * @return All Systems within this map
-   */
-  def getAllSystems(): Set[String] = getAllSSPs.map(_.getSystemStream.getSystem).toSet
-
-  /**
-   * Return a set of all the Partition IDs in the SystemStreamPartitions across all the keys
-   *
-   * @return All Partition IDs within this map
-   */
-  def getAllPartitionIds(): Set[Int] = getAllSSPs.map(_.getPartition.getPartitionId).toSet
-
-  /**
-   * Return a set of all the Streams in the SystemStreamPartitions across all the keys
-   *
-   * @return All Streams within this map
-   */
-  def getAllStreams(): Set[String] = getAllSSPs.map(_.getSystemStream.getStream).toSet
-
-  /**
-   * Return a set of all the SystemStreams in the SystemStreamPartitions across all the keys
-   *
-   * @return All SystemStreams within this map
-   */
-  def getAllSystemStreams: Set[SystemStream] = getAllSSPs().map(_.getSystemStream).toSet
-
-  // CommandBuilder needs to get a copy of this map and is a Java interface, therefore we can't just go straight
-  // from this type to JSON (for passing into the command option.
-  // Not super crazy about having the Java -> Scala and Scala -> Java methods in two different (but close) places:
-  // here and in the apply method on the companion object.  May be better to just have a conversion util, but would
-  // be less clean.  Life is cruel on the border of Scalapolis and Javatown.
-  def getJavaFriendlyType: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]] = {
-    import scala.collection.JavaConverters._
-
-    m.map({case(k,v) => k -> v.asJava}).toMap.asJava
-  }
-}
-
-object TaskNamesToSystemStreamPartitions {
-  def apply() = new TaskNamesToSystemStreamPartitions()
-
-  def apply(m: Map[TaskName, Set[SystemStreamPartition]]) = new TaskNamesToSystemStreamPartitions(m)
-
-  /**
-   * Convert from Java-happy type we obtain from the SSPTaskName factory
-   *
-   * @param m Java version of a map of sets of strings
-   * @return Populated SSPTaskName map
-   */
-  def apply(m: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]) = {
-    import scala.collection.JavaConversions._
-
-    val rightType: immutable.Map[TaskName, Set[SystemStreamPartition]] = m.map({case(k,v) => k -> v.toSet}).toMap
-
-    new TaskNamesToSystemStreamPartitions(rightType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
index 7a3ba46..8071fec 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
@@ -16,35 +16,42 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.container.grouper.task
 
-import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConversions._
 
 /**
- * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames
- * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be).
- * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution
- * of the number of taskNames between containers, etc.
+ * Group the SSP taskNames by dividing the number of taskNames into the number
+ * of containers (n) and assigning n taskNames to each container as returned by
+ * iterating over the keys in the map of taskNames (whatever that ordering
+ * happens to be). No consideration is given towards locality, even distribution
+ * of aggregate SSPs within a container, even distribution of the number of
+ * taskNames between containers, etc.
  */
-class GroupByContainerCount(numContainers:Int) extends TaskNameGrouper {
+class GroupByContainerCount(numContainers: Int) extends TaskNameGrouper {
   require(numContainers > 0, "Must have at least one container")
 
-  override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = {
-    val keySize = taskNames.keySet.size
-    require(keySize > 0, "Must have some SSPs to group, but found none")
-
-    // Iterate through the taskNames, round-robining them per container
-    val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap
-    var idx = 0
-    for(taskName <- taskNames.iterator) {
-      val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above
-      idx = (idx + 1) % numContainers
-
-      currMap += taskName
-    }
+  override def group(tasks: Set[TaskModel]): Set[ContainerModel] = {
+    require(tasks.size > 0, "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.")
+    require(tasks.size >= numContainers, "Your container count (%s) is larger than your task count (%s). Can't have containers with nothing to do, so aborting." format (numContainers, tasks.size))
 
-    byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap
+    tasks
+      .toList
+      // Sort tasks by taskName.
+      .sortWith { case (task1, task2) => task1.compareTo(task2) < 0 }
+      // Assign every task an ID.
+      .zip(0 until tasks.size)
+      // Map every task to a container using its task ID.
+      .groupBy(_._2 % numContainers)
+      // Take just TaskModel and remove task IDs.
+      .mapValues(_.map { case (task, taskId) => (task.getTaskName, task) }.toMap)
+      .map { case (containerId, tasks) => new ContainerModel(containerId, tasks) }
+      .toSet
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
index 46e75b1..62e94ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
@@ -18,23 +18,34 @@
  */
 package org.apache.samza.container.grouper.task
 
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.job.model.ContainerModel
 
 /**
- * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of
- * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}, we can then map those groupings onto
- * the {@link org.apache.samza.container.SamzaContainer}s on which they will run.  This class takes
- * those groupings-of-SSPs and groups them together on which container each should run on.  A simple
- * implementation could assign each TaskNamesToSystemStreamPartition to a separate container.  More
- * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them
- * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc.
+ * <p>
+ * After the input SystemStreamPartitions have been mapped to their tasks by an
+ * implementation of
+ * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}
+ * , we can then map those groupings into the
+ * {@link org.apache.samza.container.SamzaContainer}s on which they will run.
+ * This class takes a set of TaskModels and groups them together into
+ * ContainerModels. All tasks within a single ContainerModel will be executed in
+ * a single SamzaContainer.
+ * </p>
+ *
+ * <p>
+ * A simple implementation could assign each TaskModel to a separate container.
+ * More advanced implementations could examine the TaskModel to group them by
+ * data locality, anti-affinity, even distribution of expected bandwidth
+ * consumption, etc.
+ * </p>
  */
 trait TaskNameGrouper {
   /**
-   * Group TaskNamesToSystemStreamPartitions onto the containers they will share
+   * Group tasks into the containers they will share.
    *
-   * @param taskNames Pre-grouped SSPs
-   * @return Mapping of container ID to set if TaskNames it will run
+   * @param tasks Set of tasks to group into containers.
+   * @return Set of containers, which contain the tasks that were passed in.
    */
-  def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions]
+  def group(tasks: Set[TaskModel]): Set[ContainerModel]
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
new file mode 100644
index 0000000..c14f2f6
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator
+
+import org.apache.samza.config.Config
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.SamzaException
+import org.apache.samza.container.grouper.task.GroupByContainerCount
+import org.apache.samza.util.Util
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
+import java.util
+import org.apache.samza.container.TaskName
+import org.apache.samza.util.Logging
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.config.StorageConfig.Config2Storage
+import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.Partition
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import java.net.URL
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.coordinator.server.JobServlet
+
+object JobCoordinator extends Logging {
+  /**
+   * Build a JobCoordinator using a Samza job's configuration.
+   */
+  def apply(config: Config, containerCount: Int) = {
+    val jobModel = buildJobModel(config, containerCount)
+    val server = new HttpServer
+    server.addServlet("/*", new JobServlet(jobModel))
+    new JobCoordinator(jobModel, server)
+  }
+
+  /**
+   * Gets a CheckpointManager from the configuration.
+   */
+  def getCheckpointManager(config: Config) = {
+    config.getCheckpointManagerFactory match {
+      case Some(checkpointFactoryClassName) =>
+        Util
+          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+          .getCheckpointManager(config, new MetricsRegistryMap)
+      case _ =>
+        if (!config.getStoreNames.isEmpty) {
+          throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified.  " +
+            "Unable to start job as there would be no place to store changelog partition mapping.")
+        }
+        null
+    }
+  }
+
+  /**
+   * For each input stream specified in config, exactly determine its
+   * partitions, returning a set of SystemStreamPartitions containing them all.
+   */
+  def getInputStreamPartitions(config: Config) = {
+    val inputSystemStreams = config.getInputStreams
+    val systemNames = config.getSystemNames.toSet
+
+    // Map the name of each system to the corresponding SystemAdmin
+    val systemAdmins = systemNames.map(systemName => {
+      val systemFactoryClassName = config
+        .getSystemFactory(systemName)
+        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+      systemName -> systemFactory.getAdmin(systemName, config)
+    }).toMap
+
+    // Get the set of partitions for each SystemStream from the stream metadata
+    new StreamMetadataCache(systemAdmins)
+      .getStreamMetadata(inputSystemStreams)
+      .flatMap {
+        case (systemStream, metadata) =>
+          metadata
+            .getSystemStreamPartitionMetadata
+            .keys
+            .map(new SystemStreamPartition(systemStream, _))
+      }.toSet
+  }
+
+  /**
+   * Gets a SystemStreamPartitionGrouper object from the configuration.
+   */
+  def getSystemStreamPartitionGrouper(config: Config) = {
+    val factoryString = config.getSystemStreamPartitionGrouperFactory
+    val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString)
+    factory.getSystemStreamPartitionGrouper(config)
+  }
+
+  /**
+   * Build a full Samza job model using the job configuration.
+   */
+  def buildJobModel(config: Config, containerCount: Int) = {
+    // TODO containerCount should go away when we generalize the job coordinator, 
+    // and have a non-yarn-specific way of specifying container count.
+    val checkpointManager = getCheckpointManager(config)
+    val allSystemStreamPartitions = getInputStreamPartitions(config)
+    val grouper = getSystemStreamPartitionGrouper(config)
+    val previousChangelogeMapping = if (checkpointManager != null) {
+      checkpointManager.start
+      checkpointManager.readChangeLogPartitionMapping
+    } else {
+      new util.HashMap[TaskName, java.lang.Integer]()
+    }
+    var maxChangelogPartitionId = previousChangelogeMapping
+      .values
+      .map(_.toInt)
+      .toList
+      .sorted
+      .lastOption
+      .getOrElse(-1)
+
+    // Assign all SystemStreamPartitions to TaskNames.
+    val taskModels = {
+      val groups = grouper.group(allSystemStreamPartitions)
+      info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
+      groups
+        .map {
+          case (taskName, systemStreamPartitions) =>
+            val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match {
+              case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+              case _ =>
+                // If we've never seen this TaskName before, then assign it a 
+                // new changelog.
+                maxChangelogPartitionId += 1
+                info("New task %s is being assigned changelog partition %s." format (taskName, maxChangelogPartitionId))
+                new Partition(maxChangelogPartitionId)
+            }
+            new TaskModel(taskName, systemStreamPartitions, changelogPartition)
+        }
+        .toSet
+    }
+
+    // Save the changelog mapping back to the checkpoint manager.
+    if (checkpointManager != null) {
+      // newChangelogMapping is the merging of all current task:changelog 
+      // assignments with whatever we had before (previousChangelogeMapping).
+      // We must persist legacy changelog assignments so that 
+      // maxChangelogPartitionId always has the absolute max, not the current 
+      // max (in case the task with the highest changelog partition mapping 
+      // disappears.
+      val newChangelogMapping = taskModels.map(taskModel => {
+        taskModel.getTaskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+      }).toMap ++ previousChangelogeMapping
+      info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
+      checkpointManager.writeChangeLogPartitionMapping(newChangelogMapping)
+      checkpointManager.stop
+    }
+
+    // Here is where we should put in a pluggable option for the 
+    // SSPTaskNameGrouper for locality, load-balancing, etc.
+    val containerGrouper = new GroupByContainerCount(containerCount)
+    val containerModels = containerGrouper
+      .group(taskModels)
+      .map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }
+      .toMap
+
+    new JobModel(config, containerModels)
+  }
+}
+
+/**
+ * <p>JobCoordinator is responsible for managing the lifecycle of a Samza job
+ * once it's been started. This includes starting and stopping containers,
+ * managing configuration, etc.</p>
+ *
+ * <p>Any new cluster manager that's integrated with Samza (YARN, Mesos, etc)
+ * must integrate with the job coordinator.</p>
+ *
+ * <p>This class' API is currently unstable, and likely to change. The
+ * coordinator's responsibility is simply to propagate the job model, and HTTP
+ * server right now.</p>
+ */
+class JobCoordinator(
+  /**
+   * The data model that describes the Samza job's containers and tasks.
+   */
+  val jobModel: JobModel,
+
+  /**
+   * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
+   */
+  val server: HttpServer) extends Logging {
+
+  debug("Got job model: %s." format jobModel)
+
+  def start {
+    debug("Starting HTTP server.")
+    server.start
+    info("Startd HTTP server: %s" format server.getUrl)
+  }
+
+  def stop {
+    debug("Stopping HTTP server.")
+    server.stop
+    info("Stopped HTTP server.")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
index 7c0676c..10986a4 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
@@ -32,23 +32,64 @@ import org.eclipse.jetty.servlet.ServletHolder
 import java.net.URL
 import org.apache.samza.util.Logging
 
+/**
+ * <p>A Jetty-based HTTP server. The server allows arbitrary servlets to be added
+ * with the addServlet() method. The server is configured to automatically
+ * serve static CSS and JS from the /css and /js directories if a
+ * resourceBasePath is specified.</p>
+ */
 class HttpServer(
+  /**
+   * All servlet paths will be served out of the rootPath. If rootPath is set
+   * to /foo, then all servlet paths will be served underneath /foo.
+   */
   rootPath: String = "/",
+
+  /**
+   * The port that Jetty should bind to. If set to 0, Jetty will bind to a
+   * dynamically allocated free port on the machine it's running on. The port
+   * can be retrieved by calling .getUrl.
+   */
   port: Int = 0,
+
+  /**
+   * If specified, tells Jetty where static resources are located inside
+   * WEB-INF. This allows HttpServer to serve arbitrary static files that are
+   * embedded in a JAR.
+   */
   resourceBasePath: String = null,
+
+  /**
+   * The SevletHolder to use for static file (CSS/JS) serving.
+   */
   defaultHolder: ServletHolder = new ServletHolder(classOf[DefaultServlet])) extends Logging {
 
+  var running = false
   var servlets = Map[String, Servlet]()
   val server = new Server(port)
   val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
 
   defaultHolder.setName("default")
 
+  /**
+   * <p>
+   * Add a servlet to the Jetty container. Path can be wild-carded (e.g. /\*
+   * or /foo/\*), and is relative to the rootPath specified in the constructor.
+   * </p>
+   *
+   * <p>
+   * Servlets with path /bar/\* and rootPath /foo will result in a location of
+   * http://localhost/foo/bar.
+   * </p>
+   */
   def addServlet(path: String, servlet: Servlet) {
     debug("Adding servlet %s to path %s" format (servlet, path))
     servlets += path -> servlet
   }
 
+  /**
+   * Start the Jetty server, and begin serving content.
+   */
   def start {
     debug("Starting server with rootPath=%s port=%s resourceBasePath=%s" format (rootPath, port, resourceBasePath))
     context.setContextPath(rootPath)
@@ -70,18 +111,30 @@ class HttpServer(
 
     debug("Starting HttpServer.")
     server.start()
+    running = true
     info("Started HttpServer on: %s" format getUrl)
   }
 
+  /**
+   * Shutdown the Jetty server.
+   */
   def stop {
+    running = false
     debug("Stopping server")
     context.stop()
     server.stop()
     info("Stopped server")
   }
 
+  /**
+   * Returns the URL for the root of the HTTP server. This method
+   */
   def getUrl = {
-    val runningPort = server.getConnectors()(0).asInstanceOf[Connector].getLocalPort()
-    new URL("http://" + InetAddress.getLocalHost().getHostAddress() + ":" + runningPort + rootPath)
+    if (running) {
+      val runningPort = server.getConnectors()(0).asInstanceOf[Connector].getLocalPort()
+      new URL("http://" + InetAddress.getLocalHost().getHostAddress() + ":" + runningPort + rootPath)
+    } else {
+      throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index d7841a6..635c353 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -19,50 +19,12 @@
 
 package org.apache.samza.coordinator.server
 
-import org.apache.samza.config.Config
-import java.util.HashMap
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
-import org.apache.samza.util.JsonHelpers
-import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.JobModel
 import org.apache.samza.util.Logging
 
-object JobServlet {
-  val CONFIG = "config"
-  val CONTAINERS = "containers"
-  val TASK_CHANGELOG_MAPPING = "task-changelog-mappings"
-}
-
-class JobServlet(
-  config: Config,
-  containerToTaskMapping: Map[Int, TaskNamesToSystemStreamPartitions],
-  taskToChangelogMapping: Map[TaskName, Int]) extends ServletBase with Logging {
-  import JobServlet._
-  import JsonHelpers._
-
-  val javaSafeContainerToTaskMapping = buildTasksToSSPs
-  val javaSafeTaskToChangelogMappings = convertTaskNameToChangeLogPartitionMapping(taskToChangelogMapping)
-  val jsonMap = buildJsonMap
-
-  debug("Built JSON map: %s" format jsonMap)
-
-  protected def getObjectToWrite() = {
-    jsonMap
-  }
-
-  private def buildTasksToSSPs = {
-    val map = new HashMap[java.lang.Integer, java.util.HashMap[TaskName, java.util.ArrayList[SSPWrapper]]]
-    containerToTaskMapping.foreach {
-      case (containerId, taskNameToSSPs) =>
-        map.put(Integer.valueOf(containerId), convertSystemStreamPartitionSet(taskNameToSSPs.getJavaFriendlyType))
-    }
-    map
-  }
-
-  private def buildJsonMap = {
-    val map = new HashMap[String, Object]()
-    map.put(CONFIG, config)
-    map.put(CONTAINERS, javaSafeContainerToTaskMapping)
-    map.put(TASK_CHANGELOG_MAPPING, javaSafeTaskToChangelogMappings)
-    map
-  }
+/**
+ * A servlet that dumps the job model for a Samza job.
+ */
+class JobServlet(jobModel: JobModel) extends ServletBase with Logging {
+  protected def getObjectToWrite() = jobModel
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
index c9bad90..2732cca 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
@@ -19,25 +19,29 @@
 
 package org.apache.samza.coordinator.server;
 
-import java.io.IOException;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import java.io.IOException
+import javax.servlet.ServletException
+import javax.servlet.http.HttpServlet
+import javax.servlet.http.HttpServletRequest
+import javax.servlet.http.HttpServletResponse
 import org.codehaus.jackson.map.ObjectMapper;
+import org.apache.samza.serializers.model.SamzaObjectMapper
 
-object ServletBase {
-  val JSON_MAPPER = new ObjectMapper()
-}
-
+/**
+ * A simple servlet helper that makes it easy to dump objects to JSON.
+ */
 trait ServletBase extends HttpServlet {
-  import ServletBase._
+  val mapper = SamzaObjectMapper.getObjectMapper()
 
   override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
     response.setContentType("application/json")
     response.setStatus(HttpServletResponse.SC_OK)
-    JSON_MAPPER.writeValue(response.getWriter(), getObjectToWrite())
+    mapper.writeValue(response.getWriter(), getObjectToWrite())
   }
 
+  /**
+   * Returns an object that should be fed to Jackson's ObjectMapper, and 
+   * returned as an HTTP response.
+   */
   protected def getObjectToWrite(): Object
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
index bd38955..7992885 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -32,18 +32,15 @@ import java.io.InputStreamReader
 import java.io.InputStream
 import java.io.OutputStream
 import org.apache.samza.SamzaException
-import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.job.CommandBuilder
 import scala.collection.JavaConversions._
 
-class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpServer) extends StreamJob with Logging {
+class ProcessJob(commandBuilder: CommandBuilder) extends StreamJob with Logging {
   var jobStatus: Option[ApplicationStatus] = None
   var process: Process = null
 
   def submit: StreamJob = {
     jobStatus = Some(New)
-    server.start
-    commandBuilder.setUrl(server.getUrl)
     val waitForThreadStart = new CountDownLatch(1)
     val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList)
 
@@ -66,7 +63,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
         errThread.start
         waitForThreadStart.countDown
         process.waitFor
-        shutdown
       }
     }
 
@@ -79,7 +75,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
   def kill: StreamJob = {
     process.destroy
     jobStatus = Some(UnsuccessfulFinish);
-    shutdown
     ProcessJob.this
   }
 
@@ -90,7 +85,7 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
         try {
           process.waitFor
         } catch {
-          case e: InterruptedException => shutdown
+          case e: InterruptedException => info("Got interrupt.", e)
         }
       }
     }
@@ -112,10 +107,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
   }
 
   def getStatus = jobStatus.getOrElse(null)
-
-  private def shutdown {
-    server.stop
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index b1e5237..6985af6 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -19,7 +19,7 @@
 
 package org.apache.samza.job.local
 
-import org.apache.samza.container.{ TaskNamesToSystemStreamPartitions, SamzaContainer }
+import org.apache.samza.container.SamzaContainer
 import org.apache.samza.util.Logging
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
@@ -29,48 +29,40 @@ import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.JobCoordinator
 
 /**
- * Creates a stand alone ProcessJob with the specified config
+ * Creates a stand alone ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
-    // Since we're local, there will only be a single task into which all the SSPs will be processed
-    val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1)
-    if (taskToTaskNames.size != 1) {
-      throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size)
-    }
-
-    // So pull out that single TaskNamesToSystemStreamPartitions
-    val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames))
-    if (sspTaskName.size <= 0) {
-      throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
-    }
-
-    val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames)
-    info("got taskName for job %s" format sspTaskName)
-
-    val server = new HttpServer()
-    server.addServlet("/*", new JobServlet(config, taskToTaskNames, taskNameToChangeLogPartitionMapping))
+    val coordinator = JobCoordinator(config, 1)
+    val containerModel = coordinator.jobModel.getContainers.get(0)
 
-    val commandBuilder: CommandBuilder = {
-      config.getCommandClass match {
-        case Some(cmdBuilderClassName) => {
-          // A command class was specified, so we need to use a process job to
-          // execute the command in its own process.
-          Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
-        }
-        case _ => {
-          info("Defaulting to ShellCommandBuilder")
-          new ShellCommandBuilder
+    try {
+      val commandBuilder = {
+        config.getCommandClass match {
+          case Some(cmdBuilderClassName) => {
+            // A command class was specified, so we need to use a process job to
+            // execute the command in its own process.
+            Util.getObj[CommandBuilder](cmdBuilderClassName)
+          }
+          case _ => {
+            info("Defaulting to ShellCommandBuilder")
+            new ShellCommandBuilder
+          }
         }
       }
-    }
 
-    commandBuilder
-      .setConfig(config)
-      .setId(0)
+      commandBuilder
+        .setConfig(config)
+        .setId(0)
 
-    new ProcessJob(commandBuilder, server)
+      coordinator.start
+
+      new ProcessJob(commandBuilder)
+    } finally {
+      coordinator.stop
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 4d5f0d5..7504e6d 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -24,31 +24,20 @@ import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.config.TaskConfig._
-import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, SamzaContainer}
-import org.apache.samza.job.{StreamJob, StreamJobFactory}
+import org.apache.samza.container.SamzaContainer
+import org.apache.samza.job.{ StreamJob, StreamJobFactory }
 import org.apache.samza.util.Util
 import org.apache.samza.config.JobConfig._
+import org.apache.samza.coordinator.JobCoordinator
 
 /**
  * Creates a new Thread job with the given config
  */
 class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
-    // Since we're local, there will only be a single task into which all the SSPs will be processed
-    val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1)
-    if(taskToTaskNames.size != 1) {
-      throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size)
-    }
-
-    // So pull out that single TaskNamesToSystemStreamPartitions
-    val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames))
-    if (sspTaskName.size <= 0) {
-      throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
-    }
-
-    val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames)
-    info("got taskName for job %s" format sspTaskName)
     info("Creating a ThreadJob, which is only meant for debugging.")
+    val coordinator = JobCoordinator(config, 1)
+    val containerModel = coordinator.jobModel.getContainers.get(0)
 
     // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
     config.getTaskOpts match {
@@ -56,8 +45,11 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       case _ => None
     }
 
-    // No command class was specified, so execute the job in this process
-    // using a threaded job.
-    new ThreadJob(SamzaContainer(0, sspTaskName, taskNameToChangeLogPartitionMapping, config))
+    try {
+      coordinator.start
+      new ThreadJob(SamzaContainer(containerModel, config))
+    } finally {
+      coordinator.stop
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala b/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala
deleted file mode 100644
index e3f23b6..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import org.apache.samza.container.TaskName
-import org.codehaus.jackson.map.ObjectMapper
-import org.apache.samza.system.SystemStreamPartition
-import org.codehaus.jackson.`type`.TypeReference
-import java.util
-import scala.collection.JavaConversions._
-import org.apache.samza.Partition
-import scala.reflect.BeanProperty
-import org.apache.samza.config.MapConfig
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
-
-/**
- * Working with Jackson and JSON in Scala is tricky. These helper methods are
- * used to convert objects back and forth in SamzaContainer, and the
- * JobServlet.
- */
-object JsonHelpers {
-  // Jackson really hates Scala's classes, so we need to wrap up the SSP in a 
-  // form Jackson will take.
-  class SSPWrapper(@BeanProperty var partition: java.lang.Integer = null,
-    @BeanProperty var Stream: java.lang.String = null,
-    @BeanProperty var System: java.lang.String = null) {
-    def this() { this(null, null, null) }
-    def this(ssp: SystemStreamPartition) { this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, ssp.getSystemStream.getSystem) }
-  }
-
-  def convertSystemStreamPartitionSet(sspTaskNames: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]): util.HashMap[TaskName, util.ArrayList[SSPWrapper]] = {
-    val map = new util.HashMap[TaskName, util.ArrayList[SSPWrapper]]()
-    for ((key, ssps) <- sspTaskNames) {
-      val al = new util.ArrayList[SSPWrapper](ssps.size)
-      for (ssp <- ssps) { al.add(new SSPWrapper(ssp)) }
-      map.put(key, al)
-    }
-    map
-  }
-
-  def convertTaskNameToChangeLogPartitionMapping(mapping: Map[TaskName, Int]): util.HashMap[TaskName, java.lang.Integer] = {
-    val javaMap = new util.HashMap[TaskName, java.lang.Integer]()
-    mapping.foreach(kv => javaMap.put(kv._1, Integer.valueOf(kv._2)))
-    javaMap
-  }
-
-  def deserializeCoordinatorBody(body: String) = new ObjectMapper().readValue(body, new TypeReference[util.HashMap[String, Object]] {}).asInstanceOf[util.HashMap[String, Object]]
-
-  def convertCoordinatorConfig(config: util.Map[String, String]) = new MapConfig(config)
-
-  def convertCoordinatorTaskNameChangelogPartitions(taskNameToChangelogMapping: util.Map[String, java.lang.Integer]) = {
-    taskNameToChangelogMapping.map {
-      case (taskName, changelogPartitionId) =>
-        (new TaskName(taskName), changelogPartitionId.toInt)
-    }.toMap
-  }
-
-  // First key is containerId, second key is TaskName, third key is 
-  // [system|stream|partition].
-  def convertCoordinatorSSPTaskNames(containers: util.Map[String, util.Map[String, util.List[util.Map[String, Object]]]]): Map[Int, TaskNamesToSystemStreamPartitions] = {
-    containers.map {
-      case (containerId, tasks) => {
-        containerId.toInt -> new TaskNamesToSystemStreamPartitions(tasks.map {
-          case (taskName, ssps) => {
-            new TaskName(taskName) -> ssps.map {
-              case (sspMap) => new SystemStreamPartition(
-                sspMap.get("system").toString,
-                sspMap.get("stream").toString,
-                new Partition(sspMap.get("partition").toString.toInt))
-            }.toSet
-          }
-        }.toMap)
-      }
-    }.toMap
-  }
-}
\ No newline at end of file