You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/28 22:08:38 UTC
[2/2] git commit: SAMZA-444;
provide a samza job data model for job coordinator
SAMZA-444; provide a samza job data model for job coordinator
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/6f595bed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/6f595bed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/6f595bed
Branch: refs/heads/master
Commit: 6f595beda2482ab85c47f40ae8345c2591007367
Parents: f6d3415
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Oct 28 09:04:14 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Oct 28 09:04:14 2014 -0700
----------------------------------------------------------------------
.../apache/samza/job/model/ContainerModel.java | 92 ++++++++
.../org/apache/samza/job/model/JobModel.java | 90 ++++++++
.../org/apache/samza/job/model/TaskModel.java | 108 +++++++++
.../model/JsonContainerModelMixIn.java | 42 ++++
.../serializers/model/JsonJobModelMixIn.java | 41 ++++
.../serializers/model/JsonTaskModelMixIn.java | 45 ++++
.../serializers/model/SamzaObjectMapper.java | 198 ++++++++++++++++
.../samza/checkpoint/CheckpointTool.scala | 9 +-
.../serializers/JsonConfigSerializer.scala | 39 ----
.../apache/samza/container/SamzaContainer.scala | 75 ++++---
.../TaskNamesToSystemStreamPartitions.scala | 145 ------------
.../grouper/task/GroupByContainerCount.scala | 47 ++--
.../grouper/task/TaskNameGrouper.scala | 35 ++-
.../samza/coordinator/JobCoordinator.scala | 225 +++++++++++++++++++
.../samza/coordinator/server/HttpServer.scala | 57 ++++-
.../samza/coordinator/server/JobServlet.scala | 50 +----
.../samza/coordinator/server/ServletBase.scala | 26 ++-
.../org/apache/samza/job/local/ProcessJob.scala | 13 +-
.../samza/job/local/ProcessJobFactory.scala | 60 +++--
.../samza/job/local/ThreadJobFactory.scala | 30 +--
.../org/apache/samza/util/JsonHelpers.scala | 93 --------
.../main/scala/org/apache/samza/util/Util.scala | 225 ++-----------------
.../model/TestSamzaObjectMapper.java | 59 +++++
.../samza/container/TestSamzaContainer.scala | 64 +++---
.../TestTaskNamesToSystemStreamPartitions.scala | 71 ------
.../task/TestGroupByContainerCount.scala | 65 ++++--
.../samza/coordinator/TestJobCoordinator.scala | 119 ++++++++++
.../scala/org/apache/samza/util/TestUtil.scala | 70 +-----
.../resources/scalate/WEB-INF/views/index.scaml | 10 +-
.../apache/samza/job/yarn/SamzaAppMaster.scala | 7 +-
.../samza/job/yarn/SamzaAppMasterService.scala | 12 +-
.../samza/job/yarn/SamzaAppMasterState.scala | 7 +-
.../job/yarn/SamzaAppMasterTaskManager.scala | 15 +-
.../org/apache/samza/job/yarn/YarnJob.scala | 4 +-
.../webapp/ApplicationMasterRestServlet.scala | 29 ++-
.../job/yarn/TestSamzaAppMasterService.scala | 16 +-
.../yarn/TestSamzaAppMasterTaskManager.scala | 36 +--
37 files changed, 1393 insertions(+), 936 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
new file mode 100644
index 0000000..98a34bc
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.samza.container.TaskName;
+
+/**
+ * <p>
+ * The data model is used to define which TaskModels a SamzaContainer should
+ * process. The model is used in the job coordinator and SamzaContainer to
+ * determine how to execute Samza jobs.
+ * </p>
+ *
+ * <p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and
+ * containers have tasks. Each data model contains relevant information, such as
+ * an id, partition information, etc.
+ * </p>
+ */
+public class ContainerModel implements Comparable<ContainerModel> {
+ private final int containerId;
+ private final Map<TaskName, TaskModel> tasks;
+
+ public ContainerModel(int containerId, Map<TaskName, TaskModel> tasks) {
+ this.containerId = containerId;
+ this.tasks = Collections.unmodifiableMap(tasks);
+ }
+
+ public int getContainerId() {
+ return containerId;
+ }
+
+ public Map<TaskName, TaskModel> getTasks() {
+ return tasks;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerModel [containerId=" + containerId + ", tasks=" + tasks + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + containerId;
+ result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ContainerModel other = (ContainerModel) obj;
+ if (containerId != other.containerId)
+ return false;
+ if (tasks == null) {
+ if (other.tasks != null)
+ return false;
+ } else if (!tasks.equals(other.tasks))
+ return false;
+ return true;
+ }
+
+ public int compareTo(ContainerModel other) {
+ return containerId - other.getContainerId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
new file mode 100644
index 0000000..c2b49c4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.samza.config.Config;
+
+/**
+ * <p>
+ * The data model used to represent a Samza job. The model is used in the job
+ * coordinator and SamzaContainer to determine how to execute Samza jobs.
+ * </p>
+ *
+ * <p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and
+ * containers have tasks. Each data model contains relevant information, such as
+ * an id, partition information, etc.
+ * </p>
+ */
+public class JobModel {
+ private final Config config;
+ private final Map<Integer, ContainerModel> containers;
+
+ public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+ this.config = config;
+ this.containers = Collections.unmodifiableMap(containers);
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public Map<Integer, ContainerModel> getContainers() {
+ return containers;
+ }
+
+ @Override
+ public String toString() {
+ return "JobModel [config=" + config + ", containers=" + containers + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((config == null) ? 0 : config.hashCode());
+ result = prime * result + ((containers == null) ? 0 : containers.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JobModel other = (JobModel) obj;
+ if (config == null) {
+ if (other.config != null)
+ return false;
+ } else if (!config.equals(other.config))
+ return false;
+ if (containers == null) {
+ if (other.containers != null)
+ return false;
+ } else if (!containers.equals(other.containers))
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
new file mode 100644
index 0000000..eb22d2e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * <p>
+ * The data model used to represent a task. The model is used in the job
+ * coordinator and SamzaContainer to determine how to execute Samza jobs.
+ * </p>
+ *
+ * <p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and
+ * containers have tasks. Each data model contains relevant information, such as
+ * an id, partition information, etc.
+ * </p>
+ */
+public class TaskModel implements Comparable<TaskModel> {
+ private final TaskName taskName;
+ private final Set<SystemStreamPartition> systemStreamPartitions;
+ private final Partition changelogPartition;
+
+ public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+ this.taskName = taskName;
+ this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
+ this.changelogPartition = changelogPartition;
+ }
+
+ public TaskName getTaskName() {
+ return taskName;
+ }
+
+ public Set<SystemStreamPartition> getSystemStreamPartitions() {
+ return systemStreamPartitions;
+ }
+
+ public Partition getChangelogPartition() {
+ return changelogPartition;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((changelogPartition == null) ? 0 : changelogPartition.hashCode());
+ result = prime * result + ((systemStreamPartitions == null) ? 0 : systemStreamPartitions.hashCode());
+ result = prime * result + ((taskName == null) ? 0 : taskName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TaskModel other = (TaskModel) obj;
+ if (changelogPartition == null) {
+ if (other.changelogPartition != null)
+ return false;
+ } else if (!changelogPartition.equals(other.changelogPartition))
+ return false;
+ if (systemStreamPartitions == null) {
+ if (other.systemStreamPartitions != null)
+ return false;
+ } else if (!systemStreamPartitions.equals(other.systemStreamPartitions))
+ return false;
+ if (taskName == null) {
+ if (other.taskName != null)
+ return false;
+ } else if (!taskName.equals(other.taskName))
+ return false;
+ return true;
+ }
+
+ public int compareTo(TaskModel other) {
+ return taskName.compareTo(other.getTaskName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
new file mode 100644
index 0000000..f197a95
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.util.Map;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskModel;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
+ */
+public abstract class JsonContainerModelMixIn {
+ @JsonCreator
+ public JsonContainerModelMixIn(@JsonProperty("container-id") int containerId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
+ }
+
+ @JsonProperty("container-id")
+ abstract int getContainerId();
+
+ @JsonProperty("tasks")
+ abstract Map<TaskName, TaskModel> getTasks();
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
new file mode 100644
index 0000000..037b5e2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.ContainerModel;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert Samza's JobModel to/from JSON.
+ */
+public abstract class JsonJobModelMixIn {
+ @JsonCreator
+ public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<Integer, ContainerModel> containers) {
+ }
+
+ @JsonProperty("config")
+ abstract Config getConfig();
+
+ @JsonProperty("containers")
+ abstract Map<Integer, ContainerModel> getContainers();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
new file mode 100644
index 0000000..7dc431c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A mix-in Jackson class to convert Samza's TaskModel to/from JSON.
+ */
+public abstract class JsonTaskModelMixIn {
+ @JsonCreator
+ public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
+ }
+
+ @JsonProperty("task-name")
+ abstract TaskName getTaskName();
+
+ @JsonProperty("system-stream-partitions")
+ abstract Set<SystemStreamPartition> getSystemStreamPartitions();
+
+ @JsonProperty("changelog-partition")
+ abstract Partition getChangelogPartition();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
new file mode 100644
index 0000000..3517912
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.ObjectCodec;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.DeserializationContext;
+import org.codehaus.jackson.map.JsonDeserializer;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.MapperConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.introspect.AnnotatedField;
+import org.codehaus.jackson.map.introspect.AnnotatedMethod;
+import org.codehaus.jackson.map.module.SimpleModule;
+import org.codehaus.jackson.type.TypeReference;
+
+/**
+ * <p>
+ * A collection of utility classes and (de)serializers to make Samza's job model
+ * work with Jackson. Rather than annotating Samza's job model directly, the
+ * Jackson-specific code is isolated so that Samza's core data model does not
+ * require a direct dependency on Jackson.
+ * </p>
+ *
+ * <p>
+ * To use Samza's job data model, use the SamzaObjectMapper.getObjectMapper()
+ * method.
+ * </p>
+ */
+public class SamzaObjectMapper {
+ private static final ObjectMapper OBJECT_MAPPER = getObjectMapper();
+
+ /**
+ * @return Returns a new ObjectMapper that's been configured to (de)serialize
+ * Samza's job data model, and simple data types such as TaskName,
+ * Partition, Config, and SystemStreamPartition.
+ */
+ public static ObjectMapper getObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule("SamzaModule", new Version(1, 0, 0, ""));
+
+ // Setup custom serdes for simple data types.
+ module.addSerializer(Partition.class, new PartitionSerializer());
+ module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
+ module.addSerializer(TaskName.class, new TaskNameSerializer());
+ module.addDeserializer(Partition.class, new PartitionDeserializer());
+ module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
+ module.addDeserializer(Config.class, new ConfigDeserializer());
+
+ // Setup mixins for data models.
+ mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
+ mapper.getDeserializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
+ mapper.getSerializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
+ mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
+ mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+ mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+
+ // Convert camel case to hyphenated field names, and register the module.
+ mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
+ mapper.registerModule(module);
+
+ return mapper;
+ }
+
+ public static class ConfigDeserializer extends JsonDeserializer<Config> {
+ @Override
+ public Config deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+ ObjectCodec oc = jsonParser.getCodec();
+ JsonNode node = oc.readTree(jsonParser);
+ return new MapConfig(OBJECT_MAPPER.<Map<String, String>> readValue(node, new TypeReference<Map<String, String>>() {
+ }));
+ }
+ }
+
+ public static class PartitionSerializer extends JsonSerializer<Partition> {
+ @Override
+ public void serialize(Partition partition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+ jsonGenerator.writeObject(Integer.valueOf(partition.getPartitionId()));
+ }
+ }
+
+ public static class PartitionDeserializer extends JsonDeserializer<Partition> {
+ @Override
+ public Partition deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+ ObjectCodec oc = jsonParser.getCodec();
+ JsonNode node = oc.readTree(jsonParser);
+ return new Partition(node.getIntValue());
+ }
+ }
+
+ public static class TaskNameSerializer extends JsonSerializer<TaskName> {
+ @Override
+ public void serialize(TaskName taskName, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+ jsonGenerator.writeObject(taskName.toString());
+ }
+ }
+
+ public static class TaskNameDeserializer extends JsonDeserializer<TaskName> {
+ @Override
+ public TaskName deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+ ObjectCodec oc = jsonParser.getCodec();
+ JsonNode node = oc.readTree(jsonParser);
+ return new TaskName(node.getTextValue());
+ }
+ }
+
+ public static class SystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> {
+ @Override
+ public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
+ Map<String, Object> systemStreamPartitionMap = new HashMap<String, Object>();
+ systemStreamPartitionMap.put("system", systemStreamPartition.getSystem());
+ systemStreamPartitionMap.put("stream", systemStreamPartition.getStream());
+ systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition());
+ jsonGenerator.writeObject(systemStreamPartitionMap);
+ }
+ }
+
+ public static class SystemStreamPartitionDeserializer extends JsonDeserializer<SystemStreamPartition> {
+ @Override
+ public SystemStreamPartition deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException {
+ ObjectCodec oc = jsonParser.getCodec();
+ JsonNode node = oc.readTree(jsonParser);
+ String system = node.get("system").getTextValue();
+ String stream = node.get("stream").getTextValue();
+ Partition partition = new Partition(node.get("partition").getIntValue());
+ return new SystemStreamPartition(system, stream, partition);
+ }
+ }
+
+ /**
+ * A Jackson property naming strategy that converts camel case JSON fields to
+ * hyphenated names. For example, myVariableName would be converted to
+ * my-variable-name.
+ */
+ public static class CamelCaseToDashesStrategy extends PropertyNamingStrategy {
+ @Override
+ public String nameForField(MapperConfig<?> config, AnnotatedField field, String defaultName) {
+ return convert(defaultName);
+ }
+
+ @Override
+ public String nameForGetterMethod(MapperConfig<?> config, AnnotatedMethod method, String defaultName) {
+ return convert(defaultName);
+ }
+
+ @Override
+ public String nameForSetterMethod(MapperConfig<?> config, AnnotatedMethod method, String defaultName) {
+ return convert(defaultName);
+ }
+
+ public String convert(String defaultName) {
+ StringBuilder builder = new StringBuilder();
+ char[] arr = defaultName.toCharArray();
+ for (int i = 0; i < arr.length; ++i) {
+ if (Character.isUpperCase(arr[i])) {
+ builder.append("-" + Character.toLowerCase(arr[i]));
+ } else {
+ builder.append(arr[i]);
+ }
+ }
+ return builder.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 64a5078..ddc30af 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -32,6 +32,7 @@ import org.apache.samza.util.{CommandLine, Util}
import org.apache.samza.{Partition, SamzaException}
import scala.collection.JavaConversions._
import org.apache.samza.util.Logging
+import org.apache.samza.coordinator.JobCoordinator
/**
* Command-line tool for inspecting and manipulating the checkpoints for a job.
@@ -136,7 +137,13 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extend
info("Using %s" format manager)
// Find all the TaskNames that would be generated for this job config
- val taskNames = Util.assignContainerToSSPTaskNames(config, 1).get(0).get.keys.toSet
+ val coordinator = JobCoordinator(config, 1)
+ val taskNames = coordinator
+ .jobModel
+ .getContainers
+ .values
+ .flatMap(_.getTasks.keys)
+ .toSet
taskNames.foreach(manager.register)
manager.start
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala b/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
deleted file mode 100644
index 60e65ea..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/serializers/JsonConfigSerializer.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config.serializers
-import scala.collection.JavaConversions._
-
-import org.codehaus.jackson.map.ObjectMapper
-
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-
-import java.util.HashMap
-
-object JsonConfigSerializer {
- val jsonMapper = new ObjectMapper()
-
- def fromJson(string: String): Config = {
- val map = jsonMapper.readValue(string, classOf[HashMap[String, String]])
- new MapConfig(map)
- }
-
- def toJson(config: Config) = jsonMapper.writeValueAsString(new HashMap[String, String](config))
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d0c9004..5885a88 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -31,7 +31,6 @@ import org.apache.samza.config.StorageConfig.Config2Storage
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.serializers.JsonConfigSerializer
import org.apache.samza.metrics.JmxServer
import org.apache.samza.metrics.JvmMetrics
import org.apache.samza.metrics.MetricsRegistryMap
@@ -60,9 +59,12 @@ import org.apache.samza.task.TaskLifecycleListenerFactory
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
import scala.collection.JavaConversions._
-import org.apache.samza.util.JsonHelpers
import java.net.URL
import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.job.model.JobModel
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -76,9 +78,11 @@ object SamzaContainer extends Logging {
try {
val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
- val (config, sspTaskNames, taskNameToChangeLogPartitionMapping) = getCoordinatorObjects(coordinatorUrl)
+ val jobModel = readJobModel(coordinatorUrl)
+ val containerModel = jobModel.getContainers()(containerId.toInt)
+ val config = jobModel.getConfig
- SamzaContainer(containerId, sspTaskNames(containerId), taskNameToChangeLogPartitionMapping, config).run
+ SamzaContainer(containerModel, config).run
} finally {
jmxServer.stop
}
@@ -89,34 +93,41 @@ object SamzaContainer extends Logging {
* assignments, and returns objects to be used for SamzaContainer's
* constructor.
*/
- def getCoordinatorObjects(coordinatorUrl: String) = {
- info("Fetching configuration from: %s" format coordinatorUrl)
- val rawCoordinatorObjects = JsonHelpers.deserializeCoordinatorBody(Util.read(new URL(coordinatorUrl)))
- val rawConfig = rawCoordinatorObjects.get(JobServlet.CONFIG).asInstanceOf[java.util.Map[String, String]]
- val rawContainers = rawCoordinatorObjects.get(JobServlet.CONTAINERS).asInstanceOf[java.util.Map[String, java.util.Map[String, java.util.List[java.util.Map[String, Object]]]]]
- val rawTaskChangelogMapping = rawCoordinatorObjects.get(JobServlet.TASK_CHANGELOG_MAPPING).asInstanceOf[java.util.Map[String, java.lang.Integer]]
- val config = JsonHelpers.convertCoordinatorConfig(rawConfig)
- val sspTaskNames = JsonHelpers.convertCoordinatorSSPTaskNames(rawContainers)
- val taskNameToChangeLogPartitionMapping = JsonHelpers.convertCoordinatorTaskNameChangelogPartitions(rawTaskChangelogMapping)
- (config, sspTaskNames, taskNameToChangeLogPartitionMapping)
+ def readJobModel(url: String) = {
+ info("Fetching configuration from: %s" format url)
+ SamzaObjectMapper
+ .getObjectMapper
+ .readValue(Util.read(new URL(url)), classOf[JobModel])
}
- def apply(containerId: Int, sspTaskNames: TaskNamesToSystemStreamPartitions, taskNameToChangeLogPartitionMapping: Map[TaskName, Int], config: Config) = {
+ def apply(containerModel: ContainerModel, config: Config) = {
+ val containerId = containerModel.getContainerId
val containerName = "samza-container-%s" format containerId
val containerPID = Util.getContainerPID
info("Setting up Samza container: %s" format containerName)
info("Samza container PID: %s" format containerPID)
info("Using configuration: %s" format config)
- info("Using tasks: %s" format sspTaskNames)
- info("Using task changelogs: %s" format taskNameToChangeLogPartitionMapping)
+ info("Using container model: %s" format containerModel)
val registry = new MetricsRegistryMap(containerName)
val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry)
val systemProducersMetrics = new SystemProducersMetrics(registry)
val systemConsumersMetrics = new SystemConsumersMetrics(registry)
- val inputSystems = sspTaskNames.getAllSystems()
+ val inputSystemStreamPartitions = containerModel
+ .getTasks
+ .values
+ .flatMap(_.getSystemStreamPartitions)
+ .toSet
+
+ val inputSystemStreams = inputSystemStreamPartitions
+ .map(_.getSystemStream)
+ .toSet
+
+ val inputSystems = inputSystemStreams
+ .map(_.getSystem)
+ .toSet
val systemNames = config.getSystemNames
@@ -144,7 +155,7 @@ object SamzaContainer extends Logging {
info("Got system factories: %s" format systemFactories.keys)
val streamMetadataCache = new StreamMetadataCache(systemAdmins)
- val inputStreamMetadata = streamMetadataCache.getStreamMetadata(sspTaskNames.getAllSystemStreams)
+ val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams)
info("Got input stream metadata: %s" format inputStreamMetadata)
@@ -213,7 +224,7 @@ object SamzaContainer extends Logging {
* A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps.
*/
val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => {
- (serdeStreams ++ sspTaskNames.getAllSSPs())
+ (serdeStreams ++ inputSystemStreamPartitions)
.filter(systemStream => getSerdeName(systemStream).isDefined)
.map(systemStream => {
val serdeName = getSerdeName(systemStream).get
@@ -380,12 +391,18 @@ object SamzaContainer extends Logging {
// Wire up all task-instance-level (unshared) objects.
- val taskNames = sspTaskNames.keys.toSet
+ val taskNames = containerModel
+ .getTasks
+ .values
+ .map(_.getTaskName)
+ .toSet
val containerContext = new SamzaContainerContext(containerId, config, taskNames)
- val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => {
- debug("Setting up task instance: %s" format taskName)
+ val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
+ debug("Setting up task instance: %s" format taskModel)
+
+ val taskName = taskModel.getTaskName
val task = Util.getObj[StreamTask](taskClassName)
@@ -404,13 +421,11 @@ object SamzaContainer extends Logging {
info("Got store consumers: %s" format storeConsumers)
- val partitionForThisTaskName = new Partition(taskNameToChangeLogPartitionMapping(taskName))
-
val taskStores = storageEngineFactories
.map {
case (storeName, storageEngineFactory) =>
val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) {
- new SystemStreamPartition(changeLogSystemStreams(storeName), partitionForThisTaskName)
+ new SystemStreamPartition(changeLogSystemStreams(storeName), taskModel.getChangelogPartition)
} else {
null
}
@@ -437,7 +452,7 @@ object SamzaContainer extends Logging {
info("Got task stores: %s" format taskStores)
- val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, changeLogMetadata)
+ val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(taskModel.getChangelogPartition, changeLogMetadata)
info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
@@ -448,9 +463,11 @@ object SamzaContainer extends Logging {
changeLogSystemStreams = changeLogSystemStreams,
changeLogOldestOffsets = changeLogOldestOffsets,
storeBaseDir = storeBaseDir,
- partitionForThisTaskName)
+ partition = taskModel.getChangelogPartition)
- val systemStreamPartitions: Set[SystemStreamPartition] = sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName " + taskName + " in map of SystemStreamPartitions: " + sspTaskNames))
+ val systemStreamPartitions = taskModel
+ .getSystemStreamPartitions
+ .toSet
info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
deleted file mode 100644
index da15346..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container
-
-import org.apache.samza.util.Logging
-import org.apache.samza.SamzaException
-import org.apache.samza.system.{SystemStream, SystemStreamPartition}
-import scala.collection.{immutable, Map, MapLike}
-
-/**
- * Map of {@link TaskName} to its set of {@link SystemStreamPartition}s with additional methods for aggregating
- * those SystemStreamPartitions' individual system, streams and partitions. Is useful for highlighting this
- * particular, heavily used map within the code.
- *
- * @param m Original map of TaskNames to SystemStreamPartitions
- */
-class TaskNamesToSystemStreamPartitions(m:Map[TaskName, Set[SystemStreamPartition]] = Map[TaskName, Set[SystemStreamPartition]]())
- extends Map[TaskName, Set[SystemStreamPartition]]
- with MapLike[TaskName, Set[SystemStreamPartition], TaskNamesToSystemStreamPartitions] with Logging {
-
- // Constructor
- validate
-
- // Methods
-
- // TODO: Get rid of public constructor, rely entirely on the companion object
- override def -(key: TaskName): TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions(m - key)
-
- override def +[B1 >: Set[SystemStreamPartition]](kv: (TaskName, B1)): Map[TaskName, B1] = new TaskNamesToSystemStreamPartitions(m + kv.asInstanceOf[(TaskName, Set[SystemStreamPartition])])
-
- override def iterator: Iterator[(TaskName, Set[SystemStreamPartition])] = m.iterator
-
- override def get(key: TaskName): Option[Set[SystemStreamPartition]] = m.get(key)
-
- override def empty: TaskNamesToSystemStreamPartitions = new TaskNamesToSystemStreamPartitions()
-
- override def seq: Map[TaskName, Set[SystemStreamPartition]] = m.seq
-
- override def foreach[U](f: ((TaskName, Set[SystemStreamPartition])) => U): Unit = m.foreach(f)
-
- override def size: Int = m.size
-
- /**
- * Validate that this is a legal mapping of TaskNames to SystemStreamPartitions. At the moment,
- * we only check that an SSP is included in the mapping at most once. We could add other,
- * pluggable validations here, or if we decided to allow an SSP to appear in the mapping more than
- * once, remove this limitation.
- */
- def validate():Unit = {
- // Convert sets of SSPs to lists, to preserve duplicates
- val allSSPs: List[SystemStreamPartition] = m.values.toList.map(_.toList).flatten
- val sspCountMap = allSSPs.groupBy(ssp => ssp) // Group all the SSPs together
- .map(ssp => (ssp._1 -> ssp._2.size)) // Turn into map -> count of that SSP
- .filter(ssp => ssp._2 != 1) // Filter out those that appear once
-
- if(!sspCountMap.isEmpty) {
- throw new SamzaException("Assigning the same SystemStreamPartition to multiple TaskNames is not currently supported." +
- " Out of compliance SystemStreamPartitions and counts: " + sspCountMap)
- }
-
- debug("Successfully validated TaskName to SystemStreamPartition set mapping:" + m)
- }
-
- /**
- * Return a set of all the SystemStreamPartitions for all the keys.
- *
- * @return All SystemStreamPartitions within this map
- */
- def getAllSSPs(): Iterable[SystemStreamPartition] = m.values.flatten
-
- /**
- * Return a set of all the Systems presents in the SystemStreamPartitions across all the keys
- *
- * @return All Systems within this map
- */
- def getAllSystems(): Set[String] = getAllSSPs.map(_.getSystemStream.getSystem).toSet
-
- /**
- * Return a set of all the Partition IDs in the SystemStreamPartitions across all the keys
- *
- * @return All Partition IDs within this map
- */
- def getAllPartitionIds(): Set[Int] = getAllSSPs.map(_.getPartition.getPartitionId).toSet
-
- /**
- * Return a set of all the Streams in the SystemStreamPartitions across all the keys
- *
- * @return All Streams within this map
- */
- def getAllStreams(): Set[String] = getAllSSPs.map(_.getSystemStream.getStream).toSet
-
- /**
- * Return a set of all the SystemStreams in the SystemStreamPartitions across all the keys
- *
- * @return All SystemStreams within this map
- */
- def getAllSystemStreams: Set[SystemStream] = getAllSSPs().map(_.getSystemStream).toSet
-
- // CommandBuilder needs to get a copy of this map and is a Java interface, therefore we can't just go straight
- // from this type to JSON (for passing into the command option.
- // Not super crazy about having the Java -> Scala and Scala -> Java methods in two different (but close) places:
- // here and in the apply method on the companion object. May be better to just have a conversion util, but would
- // be less clean. Life is cruel on the border of Scalapolis and Javatown.
- def getJavaFriendlyType: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]] = {
- import scala.collection.JavaConverters._
-
- m.map({case(k,v) => k -> v.asJava}).toMap.asJava
- }
-}
-
-object TaskNamesToSystemStreamPartitions {
- def apply() = new TaskNamesToSystemStreamPartitions()
-
- def apply(m: Map[TaskName, Set[SystemStreamPartition]]) = new TaskNamesToSystemStreamPartitions(m)
-
- /**
- * Convert from Java-happy type we obtain from the SSPTaskName factory
- *
- * @param m Java version of a map of sets of strings
- * @return Populated SSPTaskName map
- */
- def apply(m: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]) = {
- import scala.collection.JavaConversions._
-
- val rightType: immutable.Map[TaskName, Set[SystemStreamPartition]] = m.map({case(k,v) => k -> v.toSet}).toMap
-
- new TaskNamesToSystemStreamPartitions(rightType)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
index 7a3ba46..8071fec 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
@@ -16,35 +16,42 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.samza.container.grouper.task
-import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.job.model.ContainerModel
import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConversions._
/**
- * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames
- * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be).
- * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution
- * of the number of taskNames between containers, etc.
+ * Group the SSP taskNames by dividing the number of taskNames into the number
+ * of containers (n) and assigning n taskNames to each container as returned by
+ * iterating over the keys in the map of taskNames (whatever that ordering
+ * happens to be). No consideration is given towards locality, even distribution
+ * of aggregate SSPs within a container, even distribution of the number of
+ * taskNames between containers, etc.
*/
-class GroupByContainerCount(numContainers:Int) extends TaskNameGrouper {
+class GroupByContainerCount(numContainers: Int) extends TaskNameGrouper {
require(numContainers > 0, "Must have at least one container")
- override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = {
- val keySize = taskNames.keySet.size
- require(keySize > 0, "Must have some SSPs to group, but found none")
-
- // Iterate through the taskNames, round-robining them per container
- val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap
- var idx = 0
- for(taskName <- taskNames.iterator) {
- val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above
- idx = (idx + 1) % numContainers
-
- currMap += taskName
- }
+ override def group(tasks: Set[TaskModel]): Set[ContainerModel] = {
+ require(tasks.size > 0, "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.")
+ require(tasks.size >= numContainers, "Your container count (%s) is larger than your task count (%s). Can't have containers with nothing to do, so aborting." format (numContainers, tasks.size))
- byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap
+ tasks
+ .toList
+ // Sort tasks by taskName.
+ .sortWith { case (task1, task2) => task1.compareTo(task2) < 0 }
+ // Assign every task an ID.
+ .zip(0 until tasks.size)
+ // Map every task to a container using its task ID.
+ .groupBy(_._2 % numContainers)
+ // Take just TaskModel and remove task IDs.
+ .mapValues(_.map { case (task, taskId) => (task.getTaskName, task) }.toMap)
+ .map { case (containerId, tasks) => new ContainerModel(containerId, tasks) }
+ .toSet
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
index 46e75b1..62e94ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
@@ -18,23 +18,34 @@
*/
package org.apache.samza.container.grouper.task
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.job.model.ContainerModel
/**
- * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of
- * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}, we can then map those groupings onto
- * the {@link org.apache.samza.container.SamzaContainer}s on which they will run. This class takes
- * those groupings-of-SSPs and groups them together on which container each should run on. A simple
- * implementation could assign each TaskNamesToSystemStreamPartition to a separate container. More
- * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them
- * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc.
+ * <p>
+ * After the input SystemStreamPartitions have been mapped to their tasks by an
+ * implementation of
+ * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}
+ * , we can then map those groupings into the
+ * {@link org.apache.samza.container.SamzaContainer}s on which they will run.
+ * This class takes a set of TaskModels and groups them together into
+ * ContainerModels. All tasks within a single ContainerModel will be executed in
+ * a single SamzaContainer.
+ * </p>
+ *
+ * <p>
+ * A simple implementation could assign each TaskModel to a separate container.
+ * More advanced implementations could examine the TaskModel to group them by
+ * data locality, anti-affinity, even distribution of expected bandwidth
+ * consumption, etc.
+ * </p>
*/
trait TaskNameGrouper {
/**
- * Group TaskNamesToSystemStreamPartitions onto the containers they will share
+ * Group tasks into the containers they will share.
*
- * @param taskNames Pre-grouped SSPs
- * @return Mapping of container ID to set if TaskNames it will run
+ * @param tasks Set of tasks to group into containers.
+ * @return Set of containers, which contain the tasks that were passed in.
*/
- def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions]
+ def group(tasks: Set[TaskModel]): Set[ContainerModel]
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
new file mode 100644
index 0000000..c14f2f6
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator
+
+import org.apache.samza.config.Config
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.SamzaException
+import org.apache.samza.container.grouper.task.GroupByContainerCount
+import org.apache.samza.util.Util
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
+import java.util
+import org.apache.samza.container.TaskName
+import org.apache.samza.util.Logging
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.config.StorageConfig.Config2Storage
+import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.Partition
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import java.net.URL
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.coordinator.server.JobServlet
+
+object JobCoordinator extends Logging {
+ /**
+ * Build a JobCoordinator using a Samza job's configuration.
+ */
+ def apply(config: Config, containerCount: Int) = {
+ val jobModel = buildJobModel(config, containerCount)
+ val server = new HttpServer
+ server.addServlet("/*", new JobServlet(jobModel))
+ new JobCoordinator(jobModel, server)
+ }
+
+ /**
+ * Gets a CheckpointManager from the configuration.
+ */
+ def getCheckpointManager(config: Config) = {
+ config.getCheckpointManagerFactory match {
+ case Some(checkpointFactoryClassName) =>
+ Util
+ .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+ .getCheckpointManager(config, new MetricsRegistryMap)
+ case _ =>
+ if (!config.getStoreNames.isEmpty) {
+ throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified. " +
+ "Unable to start job as there would be no place to store changelog partition mapping.")
+ }
+ null
+ }
+ }
+
+ /**
+ * For each input stream specified in config, exactly determine its
+ * partitions, returning a set of SystemStreamPartitions containing them all.
+ */
+ def getInputStreamPartitions(config: Config) = {
+ val inputSystemStreams = config.getInputStreams
+ val systemNames = config.getSystemNames.toSet
+
+ // Map the name of each system to the corresponding SystemAdmin
+ val systemAdmins = systemNames.map(systemName => {
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ systemName -> systemFactory.getAdmin(systemName, config)
+ }).toMap
+
+ // Get the set of partitions for each SystemStream from the stream metadata
+ new StreamMetadataCache(systemAdmins)
+ .getStreamMetadata(inputSystemStreams)
+ .flatMap {
+ case (systemStream, metadata) =>
+ metadata
+ .getSystemStreamPartitionMetadata
+ .keys
+ .map(new SystemStreamPartition(systemStream, _))
+ }.toSet
+ }
+
+ /**
+ * Gets a SystemStreamPartitionGrouper object from the configuration.
+ */
+ def getSystemStreamPartitionGrouper(config: Config) = {
+ val factoryString = config.getSystemStreamPartitionGrouperFactory
+ val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString)
+ factory.getSystemStreamPartitionGrouper(config)
+ }
+
+ /**
+ * Build a full Samza job model using the job configuration.
+ */
+ def buildJobModel(config: Config, containerCount: Int) = {
+ // TODO containerCount should go away when we generalize the job coordinator,
+ // and have a non-yarn-specific way of specifying container count.
+ val checkpointManager = getCheckpointManager(config)
+ val allSystemStreamPartitions = getInputStreamPartitions(config)
+ val grouper = getSystemStreamPartitionGrouper(config)
+ val previousChangelogeMapping = if (checkpointManager != null) {
+ checkpointManager.start
+ checkpointManager.readChangeLogPartitionMapping
+ } else {
+ new util.HashMap[TaskName, java.lang.Integer]()
+ }
+ var maxChangelogPartitionId = previousChangelogeMapping
+ .values
+ .map(_.toInt)
+ .toList
+ .sorted
+ .lastOption
+ .getOrElse(-1)
+
+ // Assign all SystemStreamPartitions to TaskNames.
+ val taskModels = {
+ val groups = grouper.group(allSystemStreamPartitions)
+ info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
+ groups
+ .map {
+ case (taskName, systemStreamPartitions) =>
+ val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match {
+ case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+ case _ =>
+ // If we've never seen this TaskName before, then assign it a
+ // new changelog.
+ maxChangelogPartitionId += 1
+ info("New task %s is being assigned changelog partition %s." format (taskName, maxChangelogPartitionId))
+ new Partition(maxChangelogPartitionId)
+ }
+ new TaskModel(taskName, systemStreamPartitions, changelogPartition)
+ }
+ .toSet
+ }
+
+ // Save the changelog mapping back to the checkpoint manager.
+ if (checkpointManager != null) {
+ // newChangelogMapping is the merging of all current task:changelog
+ // assignments with whatever we had before (previousChangelogeMapping).
+ // We must persist legacy changelog assignments so that
+ // maxChangelogPartitionId always has the absolute max, not the current
+ // max (in case the task with the highest changelog partition mapping
+ // disappears.
+ val newChangelogMapping = taskModels.map(taskModel => {
+ taskModel.getTaskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+ }).toMap ++ previousChangelogeMapping
+ info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
+ checkpointManager.writeChangeLogPartitionMapping(newChangelogMapping)
+ checkpointManager.stop
+ }
+
+ // Here is where we should put in a pluggable option for the
+ // SSPTaskNameGrouper for locality, load-balancing, etc.
+ val containerGrouper = new GroupByContainerCount(containerCount)
+ val containerModels = containerGrouper
+ .group(taskModels)
+ .map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }
+ .toMap
+
+ new JobModel(config, containerModels)
+ }
+}
+
+/**
+ * <p>JobCoordinator is responsible for managing the lifecycle of a Samza job
+ * once it's been started. This includes starting and stopping containers,
+ * managing configuration, etc.</p>
+ *
+ * <p>Any new cluster manager that's integrated with Samza (YARN, Mesos, etc)
+ * must integrate with the job coordinator.</p>
+ *
+ * <p>This class' API is currently unstable, and likely to change. The
+ * coordinator's responsibility is simply to propagate the job model, and HTTP
+ * server right now.</p>
+ */
+class JobCoordinator(
+ /**
+ * The data model that describes the Samza job's containers and tasks.
+ */
+ val jobModel: JobModel,
+
+ /**
+ * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
+ */
+ val server: HttpServer) extends Logging {
+
+ debug("Got job model: %s." format jobModel)
+
+ def start {
+ debug("Starting HTTP server.")
+ server.start
+ info("Startd HTTP server: %s" format server.getUrl)
+ }
+
+ def stop {
+ debug("Stopping HTTP server.")
+ server.stop
+ info("Stopped HTTP server.")
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
index 7c0676c..10986a4 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
@@ -32,23 +32,64 @@ import org.eclipse.jetty.servlet.ServletHolder
import java.net.URL
import org.apache.samza.util.Logging
+/**
+ * <p>A Jetty-based HTTP server. The server allows arbitrary servlets to be added
+ * with the addServlet() method. The server is configured to automatically
+ * serve static CSS and JS from the /css and /js directories if a
+ * resourceBasePath is specified.</p>
+ */
class HttpServer(
+ /**
+ * All servlet paths will be served out of the rootPath. If rootPath is set
+ * to /foo, then all servlet paths will be served underneath /foo.
+ */
rootPath: String = "/",
+
+ /**
+ * The port that Jetty should bind to. If set to 0, Jetty will bind to a
+ * dynamically allocated free port on the machine it's running on. The port
+ * can be retrieved by calling .getUrl.
+ */
port: Int = 0,
+
+ /**
+ * If specified, tells Jetty where static resources are located inside
+ * WEB-INF. This allows HttpServer to serve arbitrary static files that are
+ * embedded in a JAR.
+ */
resourceBasePath: String = null,
+
+ /**
+ * The SevletHolder to use for static file (CSS/JS) serving.
+ */
defaultHolder: ServletHolder = new ServletHolder(classOf[DefaultServlet])) extends Logging {
+ var running = false
var servlets = Map[String, Servlet]()
val server = new Server(port)
val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
defaultHolder.setName("default")
+ /**
+ * <p>
+ * Add a servlet to the Jetty container. Path can be wild-carded (e.g. /\*
+ * or /foo/\*), and is relative to the rootPath specified in the constructor.
+ * </p>
+ *
+ * <p>
+ * Servlets with path /bar/\* and rootPath /foo will result in a location of
+ * http://localhost/foo/bar.
+ * </p>
+ */
def addServlet(path: String, servlet: Servlet) {
debug("Adding servlet %s to path %s" format (servlet, path))
servlets += path -> servlet
}
+ /**
+ * Start the Jetty server, and begin serving content.
+ */
def start {
debug("Starting server with rootPath=%s port=%s resourceBasePath=%s" format (rootPath, port, resourceBasePath))
context.setContextPath(rootPath)
@@ -70,18 +111,30 @@ class HttpServer(
debug("Starting HttpServer.")
server.start()
+ running = true
info("Started HttpServer on: %s" format getUrl)
}
+ /**
+ * Shutdown the Jetty server.
+ */
def stop {
+ running = false
debug("Stopping server")
context.stop()
server.stop()
info("Stopped server")
}
+ /**
+ * Returns the URL for the root of the HTTP server. This method
+ */
def getUrl = {
- val runningPort = server.getConnectors()(0).asInstanceOf[Connector].getLocalPort()
- new URL("http://" + InetAddress.getLocalHost().getHostAddress() + ":" + runningPort + rootPath)
+ if (running) {
+ val runningPort = server.getConnectors()(0).asInstanceOf[Connector].getLocalPort()
+ new URL("http://" + InetAddress.getLocalHost().getHostAddress() + ":" + runningPort + rootPath)
+ } else {
+ throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.")
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index d7841a6..635c353 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -19,50 +19,12 @@
package org.apache.samza.coordinator.server
-import org.apache.samza.config.Config
-import java.util.HashMap
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
-import org.apache.samza.util.JsonHelpers
-import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.JobModel
import org.apache.samza.util.Logging
-object JobServlet {
- val CONFIG = "config"
- val CONTAINERS = "containers"
- val TASK_CHANGELOG_MAPPING = "task-changelog-mappings"
-}
-
-class JobServlet(
- config: Config,
- containerToTaskMapping: Map[Int, TaskNamesToSystemStreamPartitions],
- taskToChangelogMapping: Map[TaskName, Int]) extends ServletBase with Logging {
- import JobServlet._
- import JsonHelpers._
-
- val javaSafeContainerToTaskMapping = buildTasksToSSPs
- val javaSafeTaskToChangelogMappings = convertTaskNameToChangeLogPartitionMapping(taskToChangelogMapping)
- val jsonMap = buildJsonMap
-
- debug("Built JSON map: %s" format jsonMap)
-
- protected def getObjectToWrite() = {
- jsonMap
- }
-
- private def buildTasksToSSPs = {
- val map = new HashMap[java.lang.Integer, java.util.HashMap[TaskName, java.util.ArrayList[SSPWrapper]]]
- containerToTaskMapping.foreach {
- case (containerId, taskNameToSSPs) =>
- map.put(Integer.valueOf(containerId), convertSystemStreamPartitionSet(taskNameToSSPs.getJavaFriendlyType))
- }
- map
- }
-
- private def buildJsonMap = {
- val map = new HashMap[String, Object]()
- map.put(CONFIG, config)
- map.put(CONTAINERS, javaSafeContainerToTaskMapping)
- map.put(TASK_CHANGELOG_MAPPING, javaSafeTaskToChangelogMappings)
- map
- }
+/**
+ * A servlet that dumps the job model for a Samza job.
+ */
+class JobServlet(jobModel: JobModel) extends ServletBase with Logging {
+ protected def getObjectToWrite() = jobModel
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
index c9bad90..2732cca 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/ServletBase.scala
@@ -19,25 +19,29 @@
package org.apache.samza.coordinator.server;
-import java.io.IOException;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+import java.io.IOException
+import javax.servlet.ServletException
+import javax.servlet.http.HttpServlet
+import javax.servlet.http.HttpServletRequest
+import javax.servlet.http.HttpServletResponse
import org.codehaus.jackson.map.ObjectMapper;
+import org.apache.samza.serializers.model.SamzaObjectMapper
-object ServletBase {
- val JSON_MAPPER = new ObjectMapper()
-}
-
+/**
+ * A simple servlet helper that makes it easy to dump objects to JSON.
+ */
trait ServletBase extends HttpServlet {
- import ServletBase._
+ val mapper = SamzaObjectMapper.getObjectMapper()
override protected def doGet(request: HttpServletRequest, response: HttpServletResponse) {
response.setContentType("application/json")
response.setStatus(HttpServletResponse.SC_OK)
- JSON_MAPPER.writeValue(response.getWriter(), getObjectToWrite())
+ mapper.writeValue(response.getWriter(), getObjectToWrite())
}
+ /**
+ * Returns an object that should be fed to Jackson's ObjectMapper, and
+ * returned as an HTTP response.
+ */
protected def getObjectToWrite(): Object
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
index bd38955..7992885 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -32,18 +32,15 @@ import java.io.InputStreamReader
import java.io.InputStream
import java.io.OutputStream
import org.apache.samza.SamzaException
-import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.job.CommandBuilder
import scala.collection.JavaConversions._
-class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpServer) extends StreamJob with Logging {
+class ProcessJob(commandBuilder: CommandBuilder) extends StreamJob with Logging {
var jobStatus: Option[ApplicationStatus] = None
var process: Process = null
def submit: StreamJob = {
jobStatus = Some(New)
- server.start
- commandBuilder.setUrl(server.getUrl)
val waitForThreadStart = new CountDownLatch(1)
val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList)
@@ -66,7 +63,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
errThread.start
waitForThreadStart.countDown
process.waitFor
- shutdown
}
}
@@ -79,7 +75,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
def kill: StreamJob = {
process.destroy
jobStatus = Some(UnsuccessfulFinish);
- shutdown
ProcessJob.this
}
@@ -90,7 +85,7 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
try {
process.waitFor
} catch {
- case e: InterruptedException => shutdown
+ case e: InterruptedException => info("Got interrupt.", e)
}
}
}
@@ -112,10 +107,6 @@ class ProcessJob(commandBuilder: CommandBuilder, server: HttpServer = new HttpSe
}
def getStatus = jobStatus.getOrElse(null)
-
- private def shutdown {
- server.stop
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index b1e5237..6985af6 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -19,7 +19,7 @@
package org.apache.samza.job.local
-import org.apache.samza.container.{ TaskNamesToSystemStreamPartitions, SamzaContainer }
+import org.apache.samza.container.SamzaContainer
import org.apache.samza.util.Logging
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
@@ -29,48 +29,40 @@ import org.apache.samza.util.Util
import scala.collection.JavaConversions._
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.JobCoordinator
/**
- * Creates a stand alone ProcessJob with the specified config
+ * Creates a stand alone ProcessJob with the specified config.
*/
class ProcessJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config): StreamJob = {
- // Since we're local, there will only be a single task into which all the SSPs will be processed
- val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1)
- if (taskToTaskNames.size != 1) {
- throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size)
- }
-
- // So pull out that single TaskNamesToSystemStreamPartitions
- val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames))
- if (sspTaskName.size <= 0) {
- throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
- }
-
- val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames)
- info("got taskName for job %s" format sspTaskName)
-
- val server = new HttpServer()
- server.addServlet("/*", new JobServlet(config, taskToTaskNames, taskNameToChangeLogPartitionMapping))
+ val coordinator = JobCoordinator(config, 1)
+ val containerModel = coordinator.jobModel.getContainers.get(0)
- val commandBuilder: CommandBuilder = {
- config.getCommandClass match {
- case Some(cmdBuilderClassName) => {
- // A command class was specified, so we need to use a process job to
- // execute the command in its own process.
- Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
- }
- case _ => {
- info("Defaulting to ShellCommandBuilder")
- new ShellCommandBuilder
+ try {
+ val commandBuilder = {
+ config.getCommandClass match {
+ case Some(cmdBuilderClassName) => {
+ // A command class was specified, so we need to use a process job to
+ // execute the command in its own process.
+ Util.getObj[CommandBuilder](cmdBuilderClassName)
+ }
+ case _ => {
+ info("Defaulting to ShellCommandBuilder")
+ new ShellCommandBuilder
+ }
}
}
- }
- commandBuilder
- .setConfig(config)
- .setId(0)
+ commandBuilder
+ .setConfig(config)
+ .setId(0)
- new ProcessJob(commandBuilder, server)
+ coordinator.start
+
+ new ProcessJob(commandBuilder)
+ } finally {
+ coordinator.stop
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 4d5f0d5..7504e6d 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -24,31 +24,20 @@ import org.apache.samza.SamzaException
import org.apache.samza.config.Config
import org.apache.samza.config.ShellCommandConfig._
import org.apache.samza.config.TaskConfig._
-import org.apache.samza.container.{TaskNamesToSystemStreamPartitions, SamzaContainer}
-import org.apache.samza.job.{StreamJob, StreamJobFactory}
+import org.apache.samza.container.SamzaContainer
+import org.apache.samza.job.{ StreamJob, StreamJobFactory }
import org.apache.samza.util.Util
import org.apache.samza.config.JobConfig._
+import org.apache.samza.coordinator.JobCoordinator
/**
* Creates a new Thread job with the given config
*/
class ThreadJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config): StreamJob = {
- // Since we're local, there will only be a single task into which all the SSPs will be processed
- val taskToTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, 1)
- if(taskToTaskNames.size != 1) {
- throw new SamzaException("Should only have a single task count but somehow got more " + taskToTaskNames.size)
- }
-
- // So pull out that single TaskNamesToSystemStreamPartitions
- val sspTaskName: TaskNamesToSystemStreamPartitions = taskToTaskNames.getOrElse(0, throw new SamzaException("Should have a 0 task number for the SSPs but somehow do not: " + taskToTaskNames))
- if (sspTaskName.size <= 0) {
- throw new SamzaException("No SystemStreamPartitions to process were detected for your input streams. It's likely that the system(s) specified don't know about the input streams: %s" format config.getInputStreams)
- }
-
- val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, taskToTaskNames)
- info("got taskName for job %s" format sspTaskName)
info("Creating a ThreadJob, which is only meant for debugging.")
+ val coordinator = JobCoordinator(config, 1)
+ val containerModel = coordinator.jobModel.getContainers.get(0)
// Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
config.getTaskOpts match {
@@ -56,8 +45,11 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
case _ => None
}
- // No command class was specified, so execute the job in this process
- // using a threaded job.
- new ThreadJob(SamzaContainer(0, sspTaskName, taskNameToChangeLogPartitionMapping, config))
+ try {
+ coordinator.start
+ new ThreadJob(SamzaContainer(containerModel, config))
+ } finally {
+ coordinator.stop
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6f595bed/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala b/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala
deleted file mode 100644
index e3f23b6..0000000
--- a/samza-core/src/main/scala/org/apache/samza/util/JsonHelpers.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util
-
-import org.apache.samza.container.TaskName
-import org.codehaus.jackson.map.ObjectMapper
-import org.apache.samza.system.SystemStreamPartition
-import org.codehaus.jackson.`type`.TypeReference
-import java.util
-import scala.collection.JavaConversions._
-import org.apache.samza.Partition
-import scala.reflect.BeanProperty
-import org.apache.samza.config.MapConfig
-import org.apache.samza.container.TaskNamesToSystemStreamPartitions
-
-/**
- * Working with Jackson and JSON in Scala is tricky. These helper methods are
- * used to convert objects back and forth in SamzaContainer, and the
- * JobServlet.
- */
-object JsonHelpers {
- // Jackson really hates Scala's classes, so we need to wrap up the SSP in a
- // form Jackson will take.
- class SSPWrapper(@BeanProperty var partition: java.lang.Integer = null,
- @BeanProperty var Stream: java.lang.String = null,
- @BeanProperty var System: java.lang.String = null) {
- def this() { this(null, null, null) }
- def this(ssp: SystemStreamPartition) { this(ssp.getPartition.getPartitionId, ssp.getSystemStream.getStream, ssp.getSystemStream.getSystem) }
- }
-
- def convertSystemStreamPartitionSet(sspTaskNames: java.util.Map[TaskName, java.util.Set[SystemStreamPartition]]): util.HashMap[TaskName, util.ArrayList[SSPWrapper]] = {
- val map = new util.HashMap[TaskName, util.ArrayList[SSPWrapper]]()
- for ((key, ssps) <- sspTaskNames) {
- val al = new util.ArrayList[SSPWrapper](ssps.size)
- for (ssp <- ssps) { al.add(new SSPWrapper(ssp)) }
- map.put(key, al)
- }
- map
- }
-
- def convertTaskNameToChangeLogPartitionMapping(mapping: Map[TaskName, Int]): util.HashMap[TaskName, java.lang.Integer] = {
- val javaMap = new util.HashMap[TaskName, java.lang.Integer]()
- mapping.foreach(kv => javaMap.put(kv._1, Integer.valueOf(kv._2)))
- javaMap
- }
-
- def deserializeCoordinatorBody(body: String) = new ObjectMapper().readValue(body, new TypeReference[util.HashMap[String, Object]] {}).asInstanceOf[util.HashMap[String, Object]]
-
- def convertCoordinatorConfig(config: util.Map[String, String]) = new MapConfig(config)
-
- def convertCoordinatorTaskNameChangelogPartitions(taskNameToChangelogMapping: util.Map[String, java.lang.Integer]) = {
- taskNameToChangelogMapping.map {
- case (taskName, changelogPartitionId) =>
- (new TaskName(taskName), changelogPartitionId.toInt)
- }.toMap
- }
-
- // First key is containerId, second key is TaskName, third key is
- // [system|stream|partition].
- def convertCoordinatorSSPTaskNames(containers: util.Map[String, util.Map[String, util.List[util.Map[String, Object]]]]): Map[Int, TaskNamesToSystemStreamPartitions] = {
- containers.map {
- case (containerId, tasks) => {
- containerId.toInt -> new TaskNamesToSystemStreamPartitions(tasks.map {
- case (taskName, ssps) => {
- new TaskName(taskName) -> ssps.map {
- case (sspMap) => new SystemStreamPartition(
- sspMap.get("system").toString,
- sspMap.get("stream").toString,
- new Partition(sspMap.get("partition").toString.toInt))
- }.toSet
- }
- }.toMap)
- }
- }.toMap
- }
-}
\ No newline at end of file