You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/11/02 23:07:48 UTC
[3/3] samza git commit: SAMZA-798 : Performance and stability issue
after combining checkpoint and coordinator stream
SAMZA-798 : Performance and stability issue after combining checkpoint and coordinator stream
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/eba9b28f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/eba9b28f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/eba9b28f
Branch: refs/heads/master
Commit: eba9b28f34874b1d9a7e467d8a6046dc5357b4d5
Parents: 8677a27
Author: Navina <na...@gmail.com>
Authored: Mon Nov 2 13:59:42 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Mon Nov 2 13:59:42 2015 -0800
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 23 +
.../org/apache/samza/checkpoint/Checkpoint.java | 72 ++++
.../samza/checkpoint/CheckpointManager.java | 53 +++
.../checkpoint/CheckpointManagerFactory.java | 30 ++
.../org/apache/samza/checkpoint/Checkpoint.java | 72 ----
.../samza/checkpoint/CheckpointManager.java | 93 ----
.../stream/messages/SetCheckpoint.java | 74 ----
.../org/apache/samza/job/model/TaskModel.java | 19 +-
.../serializers/model/JsonTaskModelMixIn.java | 9 +-
.../samza/checkpoint/CheckpointTool.scala | 13 +-
.../apache/samza/checkpoint/OffsetManager.scala | 77 ++--
.../file/FileSystemCheckpointManager.scala | 87 ++++
.../apache/samza/container/SamzaContainer.scala | 18 +-
.../samza/coordinator/JobCoordinator.scala | 38 +-
.../samza/migration/JobRunnerMigration.scala | 18 +-
.../MockCoordinatorStreamWrappedConsumer.java | 16 +-
.../model/TestSamzaObjectMapper.java | 9 +-
.../samza/checkpoint/TestCheckpointTool.scala | 6 +-
.../samza/checkpoint/TestOffsetManager.scala | 16 +-
.../file/TestFileSystemCheckpointManager.scala | 86 ++++
.../samza/container/TestSamzaContainer.scala | 26 +-
.../task/TestGroupByContainerCount.scala | 2 +-
.../samza/coordinator/TestJobCoordinator.scala | 104 +----
.../org/apache/samza/job/TestJobRunner.scala | 4 +-
.../apache/samza/job/local/TestProcessJob.scala | 2 +-
.../old/checkpoint/KafkaCheckpointLogKey.scala | 188 --------
.../old/checkpoint/KafkaCheckpointManager.scala | 337 ---------------
.../KafkaCheckpointManagerFactory.scala | 108 -----
.../checkpoint/KafkaCheckpointMigration.scala | 94 ----
.../kafka/KafkaCheckpointLogKey.scala | 194 +++++++++
.../kafka/KafkaCheckpointManager.scala | 320 ++++++++++++++
.../kafka/KafkaCheckpointManagerFactory.scala | 101 +++++
.../org/apache/samza/config/KafkaConfig.scala | 10 +
.../migration/KafkaCheckpointMigration.scala | 147 +++++++
.../scala/org/apache/samza/util/KafkaUtil.scala | 114 ++++-
.../apache/samza/util/KafkaUtilException.scala | 31 ++
.../checkpoint/TestKafkaCheckpointManager.scala | 430 -------------------
.../kafka/TeskKafkaCheckpointLogKey.scala | 71 +++
.../kafka/TestKafkaCheckpointManager.scala | 244 +++++++++++
.../TestKafkaCheckpointMigration.scala | 243 +++++++++++
.../samza/job/yarn/TestContainerAllocator.java | 2 +-
.../yarn/TestHostAwareContainerAllocator.java | 2 +-
.../samza/job/yarn/TestSamzaTaskManager.java | 2 +-
.../job/yarn/TestSamzaAppMasterLifecycle.scala | 2 +-
44 files changed, 1974 insertions(+), 1633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 4adac09..b5d3813 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1471,6 +1471,29 @@
<td class="default">268435456</td>
<td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
</tr>
+
+ <tr>
+ <th colspan="3" class="section" id="task-migration">
+ Migrating from Samza 0.9.1 to 0.10.0<br>
+ <span class="subtitle">
+ (This section applies if you are upgrading from Samza 0.9.1 to 0.10.0 and have set
+ <a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a> to anything <b> other than </b>
+ <code>org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory</code>)
+ </span>
+ </th>
+ </tr>
+
+ <tr>
+ <td class="property" id="task-checkpoint-skip-migration">task.checkpoint.skip-migration</td>
+ <td class="default">false</td>
+ <td class="description">
+ When migrating from 0.9.1 to 0.10.0, the taskName-to-changelog partition mapping was moved from the checkpoint stream to the coordinator stream. <br />
+ If you are using a checkpoint manager other than kafka
+ (<code>org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory</code>), you have to
+ manually migrate taskName-to-changelog partition mapping to the coordinator stream. <br />
+ This can be achieved with the assistance of the <code>checkpoint-tool.sh</code>.
+ </td>
+ </tr>
</tbody>
</table>
</body>
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
new file mode 100644
index 0000000..593d118
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+public class Checkpoint {
+ private final Map<SystemStreamPartition, String> offsets;
+
+ /**
+ * Constructs a new checkpoint based off a map of Samza stream offsets.
+ * @param offsets Map of Samza streams to their current offset.
+ */
+ public Checkpoint(Map<SystemStreamPartition, String> offsets) {
+ this.offsets = offsets;
+ }
+
+ /**
+ * Gets a unmodifiable view of the current Samza stream offsets.
+ * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+ */
+ public Map<SystemStreamPartition, String> getOffsets() {
+ return Collections.unmodifiableMap(offsets);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Checkpoint)) return false;
+
+ Checkpoint that = (Checkpoint) o;
+
+ if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return offsets != null ? offsets.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "Checkpoint [offsets=" + offsets + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
new file mode 100644
index 0000000..dc14beb
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.container.TaskName;
+
+/**
+ * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
+ * implementation-specific location.
+ */
+public interface CheckpointManager {
+ void start();
+
+ /**
+ * Registers this manager to write checkpoints of a specific Samza stream partition.
+ * @param taskName Specific Samza taskName of which to write checkpoints for.
+ */
+ void register(TaskName taskName);
+
+ /**
+ * Writes a checkpoint based on the current state of a Samza stream partition.
+ * @param taskName Specific Samza taskName of which to write a checkpoint of.
+ * @param checkpoint Reference to a Checkpoint object to store offset data in.
+ */
+ void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);
+
+ /**
+ * Returns the last recorded checkpoint for a specified taskName.
+ * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+ * @return A Checkpoint object with the recorded offset data of the specified partition.
+ */
+ Checkpoint readLastCheckpoint(TaskName taskName);
+
+ void stop();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
new file mode 100644
index 0000000..fe480b5
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
+ */
+public interface CheckpointManagerFactory {
+ public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
deleted file mode 100644
index 593d118..0000000
--- a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
- private final Map<SystemStreamPartition, String> offsets;
-
- /**
- * Constructs a new checkpoint based off a map of Samza stream offsets.
- * @param offsets Map of Samza streams to their current offset.
- */
- public Checkpoint(Map<SystemStreamPartition, String> offsets) {
- this.offsets = offsets;
- }
-
- /**
- * Gets a unmodifiable view of the current Samza stream offsets.
- * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
- */
- public Map<SystemStreamPartition, String> getOffsets() {
- return Collections.unmodifiableMap(offsets);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Checkpoint)) return false;
-
- Checkpoint that = (Checkpoint) o;
-
- if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return offsets != null ? offsets.hashCode() : 0;
- }
-
- @Override
- public String toString() {
- return "Checkpoint [offsets=" + offsets + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
deleted file mode 100644
index 0185751..0000000
--- a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
-import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
- * CoordinatorStream underneath to do this.
- */
-public class CheckpointManager extends AbstractCoordinatorStreamManager {
-
- private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
- private final Map<TaskName, Checkpoint> taskNamesToOffsets;
- private final HashSet<TaskName> taskNames;
-
- public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
- CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
- String source) {
- super(coordinatorStreamProducer, coordinatorStreamConsumer, source);
- taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
- taskNames = new HashSet<TaskName>();
- }
-
- /**
- * Registers this manager to write checkpoints of a specific Samza stream partition.
- * @param taskName Specific Samza taskName of which to write checkpoints for.
- */
- public void register(TaskName taskName) {
- log.debug("Adding taskName {} to {}", taskName, this);
- taskNames.add(taskName);
- registerCoordinatorStreamConsumer();
- registerCoordinatorStreamProducer(taskName.getTaskName());
- }
-
- /**
- * Writes a checkpoint based on the current state of a Samza stream partition.
- * @param taskName Specific Samza taskName of which to write a checkpoint of.
- * @param checkpoint Reference to a Checkpoint object to store offset data in.
- */
- public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
- log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
- send(new SetCheckpoint(getSource(), taskName.getTaskName(), checkpoint));
- }
-
- /**
- * Returns the last recorded checkpoint for a specified taskName.
- * @param taskName Specific Samza taskName for which to get the last checkpoint of.
- * @return A Checkpoint object with the recorded offset data of the specified partition.
- */
- public Checkpoint readLastCheckpoint(TaskName taskName) {
- // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
- log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
- for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetCheckpoint.TYPE)) {
- SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
- TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
- if (taskNames.contains(taskNameInCheckpoint)) {
- taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint());
- log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName);
- }
- }
- return taskNamesToOffsets.get(taskName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
deleted file mode 100644
index 21afa85..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream.messages;
-
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Util;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The SetCheckpoint is used to store the checkpoint messages for a particular task.
- * The structure looks like:
- * {
- * Key: TaskName
- * Type: set-checkpoint
- * Source: ContainerID
- * MessageMap:
- * {
- * SSP1 : offset,
- * SSP2 : offset
- * }
- * }
- */
-public class SetCheckpoint extends CoordinatorStreamMessage {
- public static final String TYPE = "set-checkpoint";
-
- public SetCheckpoint(CoordinatorStreamMessage message) {
- super(message.getKeyArray(), message.getMessageMap());
- }
-
- /**
- * The SetCheckpoint is used to store checkpoint message for a given task.
- *
- * @param source The source writing the checkpoint
- * @param key The key for the checkpoint message (Typically task name)
- * @param checkpoint Checkpoint message to be written to the stream
- */
- public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
- super(source);
- setType(TYPE);
- setKey(key);
- Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
- for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
- putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
- }
- }
-
- public Checkpoint getCheckpoint() {
- Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
- for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
- offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
- }
- return new Checkpoint(offsetMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index e00c49d..59bf2e0 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -20,7 +20,6 @@
package org.apache.samza.job.model;
import java.util.Collections;
-import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.container.TaskName;
@@ -41,12 +40,12 @@ import org.apache.samza.system.SystemStreamPartition;
*/
public class TaskModel implements Comparable<TaskModel> {
private final TaskName taskName;
- private final Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets;
+ private final Set<SystemStreamPartition> systemStreamPartitions;
private final Partition changelogPartition;
- public TaskModel(TaskName taskName, Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, Partition changelogPartition) {
+ public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
this.taskName = taskName;
- this.systemStreamPartitionsToOffsets = Collections.unmodifiableMap(systemStreamPartitionsToOffsets);
+ this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
this.changelogPartition = changelogPartition;
}
@@ -55,17 +54,13 @@ public class TaskModel implements Comparable<TaskModel> {
}
public Set<SystemStreamPartition> getSystemStreamPartitions() {
- return systemStreamPartitionsToOffsets.keySet();
+ return systemStreamPartitions;
}
public Partition getChangelogPartition() {
return changelogPartition;
}
- public Map<SystemStreamPartition, String> getCheckpointedOffsets() {
- return systemStreamPartitionsToOffsets;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -80,7 +75,7 @@ public class TaskModel implements Comparable<TaskModel> {
if (!changelogPartition.equals(taskModel.changelogPartition)) {
return false;
}
- if (!systemStreamPartitionsToOffsets.equals(taskModel.systemStreamPartitionsToOffsets)) {
+ if (!systemStreamPartitions.equals(taskModel.systemStreamPartitions)) {
return false;
}
if (!taskName.equals(taskModel.taskName)) {
@@ -93,7 +88,7 @@ public class TaskModel implements Comparable<TaskModel> {
@Override
public int hashCode() {
int result = taskName.hashCode();
- result = 31 * result + systemStreamPartitionsToOffsets.hashCode();
+ result = 31 * result + systemStreamPartitions.hashCode();
result = 31 * result + changelogPartition.hashCode();
return result;
}
@@ -101,7 +96,7 @@ public class TaskModel implements Comparable<TaskModel> {
@Override
public String toString() {
- return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitionsToOffsets.keySet() + ", changeLogPartition=" + changelogPartition + "]";
+ return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
}
public int compareTo(TaskModel other) {
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 172358a..3ebe391 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -19,7 +19,8 @@
package org.apache.samza.serializers.model;
-import java.util.Map;
+import java.util.Set;
+
import org.apache.samza.Partition;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
@@ -31,14 +32,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
*/
public abstract class JsonTaskModelMixIn {
@JsonCreator
- public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions-offsets") Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, @JsonProperty("changelog-partition") Partition changelogPartition) {
+ public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
}
@JsonProperty("task-name")
abstract TaskName getTaskName();
- @JsonProperty("system-stream-partitions-offsets")
- abstract Map<SystemStreamPartition, String> getCheckpointedOffsets();
+ @JsonProperty("system-stream-partitions")
+ abstract Set<SystemStreamPartition> getSystemStreamPartitions();
@JsonProperty("changelog-partition")
abstract Partition getChangelogPartition();
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 2e3aeb8..31b208f 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -29,10 +29,9 @@ import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.CommandLine
+import org.apache.samza.util.{Util, CommandLine, Logging}
import org.apache.samza.{Partition, SamzaException}
import scala.collection.JavaConversions._
-import org.apache.samza.util.Logging
import org.apache.samza.coordinator.JobCoordinator
import scala.collection.immutable.HashMap
@@ -118,10 +117,12 @@ object CheckpointTool {
}
def apply(config: Config, offsets: TaskNameToCheckpointMap) = {
- val factory = new CoordinatorStreamSystemFactory
- val coordinatorStreamConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap())
- val coordinatorStreamProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap())
- val manager = new CheckpointManager(coordinatorStreamProducer, coordinatorStreamConsumer, "checkpoint-tool")
+ val manager = config.getCheckpointManagerFactory match {
+ case Some(className) =>
+ Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
+ case _ =>
+ throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
+ }
new CheckpointTool(config, offsets, manager)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 1464acc..00648e4 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,7 @@ object OffsetManager extends Logging {
config: Config,
checkpointManager: CheckpointManager = null,
systemAdmins: Map[String, SystemAdmin] = Map(),
- offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
- latestOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) = {
+ offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = {
debug("Building offset manager for %s." format systemStreamMetadata)
val offsetSettings = systemStreamMetadata
@@ -99,7 +98,7 @@ object OffsetManager extends Logging {
// Build OffsetSetting so we can create a map for OffsetManager.
(systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
}.toMap
- new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics, latestOffsets)
+ new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
}
}
@@ -142,22 +141,12 @@ class OffsetManager(
/**
* offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
*/
- val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
-
- /*
- * The previously read checkpoints restored from the coordinator stream
- */
- val previousCheckpointedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) extends Logging {
+ val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) extends Logging {
/**
* Last offsets processed for each SystemStreamPartition.
*/
- // Filter out null offset values, we can't use them, these exist only because of SSP information
- var lastProcessedOffsets = previousCheckpointedOffsets.map {
- case (taskName, sspToOffset) => {
- taskName -> sspToOffset.filter(_._2 != null)
- }
- }
+ var lastProcessedOffsets = Map[TaskName, Map[SystemStreamPartition, String]]()
/**
* Offsets to start reading from for each SystemStreamPartition. This
@@ -175,13 +164,12 @@ class OffsetManager(
def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) {
systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
// register metrics
- systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach(ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
+ systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
}
def start {
registerCheckpointManager
- initializeCheckpointManager
- loadOffsets
+ loadOffsetsFromCheckpointManager
stripResetStreams
loadStartingOffsets
loadDefaults
@@ -269,32 +257,51 @@ class OffsetManager(
}
}
- private def initializeCheckpointManager {
+ /**
+ * Loads last processed offsets from checkpoint manager for all registered
+ * partitions.
+ */
+ private def loadOffsetsFromCheckpointManager {
if (checkpointManager != null) {
+ debug("Loading offsets from checkpoint manager.")
+
checkpointManager.start
+ val result = systemStreamPartitions
+ .keys
+ .flatMap(restoreOffsetsFromCheckpoint(_))
+ .toMap
+ lastProcessedOffsets ++= result.map {
+ case (taskName, sspToOffset) => {
+ taskName -> sspToOffset.filter {
+ case (systemStreamPartition, offset) =>
+ val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
+ if (!shouldKeep) {
+ info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
+ }
+ info("Checkpointed offset is currently %s for %s" format (offset, systemStreamPartition))
+ shouldKeep
+ }
+ }
+ }
} else {
debug("Skipping offset load from checkpoint manager because no manager was defined.")
}
}
/**
- * Loads last processed offsets from checkpoint manager for all registered
- * partitions.
+ * Loads last processed offsets for a single taskName.
*/
- private def loadOffsets {
- debug("Loading offsets")
- lastProcessedOffsets.map {
- case (taskName, sspToOffsets) => {
- taskName -> sspToOffsets.filter {
- case (systemStreamPartition, offset) =>
- val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
- if (!shouldKeep) {
- info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
- }
- info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
- shouldKeep
- }
- }
+ private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, Map[SystemStreamPartition, String]] = {
+ debug("Loading checkpoints for taskName: %s." format taskName)
+
+ val checkpoint = checkpointManager.readLastCheckpoint(taskName)
+
+ if (checkpoint != null) {
+ Map(taskName -> checkpoint.getOffsets.toMap)
+ } else {
+ info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
+
+ Map(taskName -> Map())
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
new file mode 100644
index 0000000..edd0ace
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.file
+
+import java.io.File
+import java.io.FileNotFoundException
+import java.io.FileOutputStream
+import java.util
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.config.Config
+import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.container.TaskName
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.serializers.CheckpointSerde
+import scala.io.Source
+
+class FileSystemCheckpointManager(
+ jobName: String,
+ root: File,
+ serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager {
+
+ override def register(taskName: TaskName):Unit = Unit
+
+ def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
+
+ def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+ val bytes = serde.toBytes(checkpoint)
+ val fos = new FileOutputStream(getCheckpointFile(taskName))
+
+ fos.write(bytes)
+ fos.close
+ }
+
+ def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+ try {
+ val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray
+
+ serde.fromBytes(bytes)
+ } catch {
+ case e: FileNotFoundException => null
+ }
+ }
+
+ def start {
+ if (!root.exists) {
+ throw new SamzaException("Root directory for file system checkpoint manager does not exist: %s" format root)
+ }
+ }
+
+ def stop {}
+
+ private def getFile(jobName: String, taskName: TaskName, fileType:String) =
+ new File(root, "%s-%s-%s" format (jobName, taskName, fileType))
+}
+
+class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
+ def getCheckpointManager(config: Config, registry: MetricsRegistry) = {
+ val name = config
+ .getName
+ .getOrElse(throw new SamzaException("Missing job name in configs"))
+ val root = config
+ .getFileSystemCheckpointRoot
+ .getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
+ new FileSystemCheckpointManager(name, new File(root))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 0b73403..3787b85 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,8 +21,7 @@ package org.apache.samza.container
import java.io.File
import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{ CheckpointManager, OffsetManager }
-import org.apache.samza.config.Config
+import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.config.SerializerConfig.Config2Serializer
import org.apache.samza.config.ShellCommandConfig
@@ -61,7 +60,6 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.config.JobConfig.Config2Job
import java.lang.Thread.UncaughtExceptionHandler
-import org.apache.samza.checkpoint.OffsetManagerMetrics
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -308,15 +306,17 @@ object SamzaContainer extends Logging {
val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
- val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
-
+ val checkpointManager = config.getCheckpointManagerFactory() match {
+ case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
+ Util
+ .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
+ .getCheckpointManager(config, samzaContainerMetrics.registry)
+ case _ => null
+ }
info("Got checkpoint manager: %s" format checkpointManager)
- val combinedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] =
- containerModel.getTasks.map{case (taskName, taskModel) => taskName -> mapAsScalaMap(taskModel.getCheckpointedOffsets).toMap }.toMap
-
- val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
+ val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
info("Got offset manager: %s" format offsetManager)
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 03299cb..ef40c35 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -74,7 +74,6 @@ object JobCoordinator extends Logging {
coordinatorSystemConsumer.bootstrap
val config = coordinatorSystemConsumer.getConfig
info("Got config: %s" format config)
- val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
@@ -91,7 +90,7 @@ object JobCoordinator extends Logging {
val streamMetadataCache = new StreamMetadataCache(systemAdmins)
- val jobCoordinator = getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager, localityManager, streamMetadataCache)
+ val jobCoordinator = getJobCoordinator(rewriteConfig(config), changelogManager, localityManager, streamMetadataCache)
createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
jobCoordinator
@@ -103,14 +102,13 @@ object JobCoordinator extends Logging {
* Build a JobCoordinator using a Samza job's configuration.
*/
def getJobCoordinator(config: Config,
- checkpointManager: CheckpointManager,
changelogManager: ChangelogPartitionManager,
localityManager: LocalityManager,
streamMetadataCache: StreamMetadataCache) = {
- val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager, streamMetadataCache)
+ val jobModelGenerator = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
val server = new HttpServer
server.addServlet("/*", new JobServlet(jobModelGenerator))
- currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server, checkpointManager)
+ currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server)
currentJobCoordinator
}
@@ -170,7 +168,6 @@ object JobCoordinator extends Logging {
* which catchup with the latest content from the coordinator stream.
*/
private def initializeJobModel(config: Config,
- checkpointManager: CheckpointManager,
changelogManager: ChangelogPartitionManager,
localityManager: LocalityManager,
streamMetadataCache: StreamMetadataCache): () => JobModel = {
@@ -192,10 +189,6 @@ object JobCoordinator extends Logging {
{
new util.HashMap[TaskName, Integer]()
}
-
- checkpointManager.start()
- groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
-
// We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
// TODO: This code will go away with refactoring - SAMZA-678
@@ -204,7 +197,6 @@ object JobCoordinator extends Logging {
// Generate the jobModel
def jobModelGenerator(): JobModel = refreshJobModel(config,
allSystemStreamPartitions,
- checkpointManager,
groups,
previousChangelogMapping,
localityManager)
@@ -238,7 +230,6 @@ object JobCoordinator extends Logging {
*/
private def refreshJobModel(config: Config,
allSystemStreamPartitions: util.Set[SystemStreamPartition],
- checkpointManager: CheckpointManager,
groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
previousChangelogMapping: util.Map[TaskName, Integer],
localityManager: LocalityManager): JobModel = {
@@ -253,17 +244,6 @@ object JobCoordinator extends Logging {
{
groups.map
{ case (taskName, systemStreamPartitions) =>
- val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
- // Find the system partitions which don't have a checkpoint and set null for the values for offsets
- val taskOffsets = checkpoint.getOffsets
- val offsetMap = new util.HashMap[SystemStreamPartition, String]()
- systemStreamPartitions.foreach {
- ssp =>
- if(taskOffsets.containsKey(ssp))
- offsetMap.put(ssp, taskOffsets.get(ssp))
- else
- offsetMap.put(ssp, null)
- }
val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
{
case Some(changelogPartitionId) => new Partition(changelogPartitionId)
@@ -274,7 +254,7 @@ object JobCoordinator extends Logging {
info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
new Partition(maxChangelogPartitionId)
}
- new TaskModel(taskName, offsetMap, changelogPartition)
+ new TaskModel(taskName, systemStreamPartitions, changelogPartition)
}.toSet
}
@@ -336,12 +316,7 @@ class JobCoordinator(
/**
* HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
*/
- val server: HttpServer = null,
-
- /**
- * Handle to checkpoint manager that's used to refresh the JobModel
- */
- val checkpointManager: CheckpointManager) extends Logging {
+ val server: HttpServer = null) extends Logging {
debug("Got job model: %s." format jobModel)
@@ -358,9 +333,6 @@ class JobCoordinator(
debug("Stopping HTTP server.")
server.stop
info("Stopped HTTP server.")
- debug("Stopping checkpoint manager.")
- checkpointManager.stop()
- info("Stopped checkpoint manager.")
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
index 374e27e..f38b87a 100644
--- a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
+++ b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
@@ -24,7 +24,10 @@ import org.apache.samza.SamzaException
object JobRunnerMigration {
- val CHECKPOINTMIGRATION = "old.checkpoint.KafkaCheckpointMigration"
+ val CHECKPOINT_MIGRATION = "org.apache.samza.migration.KafkaCheckpointMigration"
+ val UNSUPPORTED_ERROR_MSG = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
+ "for everything else, please use the checkpoint tool to migrate the taskname-to-changelog mapping, and add " +
+ "task.checkpoint.skip-migration=true to your configs."
def apply(config: Config) = {
val migration = new JobRunnerMigration
migration.checkpointMigration(config)
@@ -38,15 +41,18 @@ class JobRunnerMigration extends Logging {
checkpointFactory match {
case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
info("Performing checkpoint migration")
- val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINTMIGRATION)
+ val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINT_MIGRATION)
checkpointMigrationPlan.migrate(config)
case None =>
info("No task.checkpoint.factory defined, not performing any checkpoint migration")
case _ =>
- val errorMsg = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
- "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration"
- error(errorMsg)
- throw new SamzaException(errorMsg)
+ val skipMigration = config.getBoolean("task.checkpoint.skip-migration", false)
+ if (skipMigration) {
+ info("Job is configured to skip any checkpoint migration.")
+ } else {
+ error(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
+ throw new SamzaException(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index dd04d28..429573b 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -20,22 +20,18 @@
package org.apache.samza.coordinator.stream;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.samza.SamzaException;
-import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlockingEnvelopeMap;
-import org.apache.samza.util.Util;
import org.codehaus.jackson.map.ObjectMapper;
/**
@@ -47,7 +43,6 @@ import org.codehaus.jackson.map.ObjectMapper;
public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
public final static String CHANGELOGPREFIX = "ch:";
- public final static String CHECKPOINTPREFIX = "cp:";
public final CountDownLatch blockConsumerPoll = new CountDownLatch(1);
public boolean blockpollFlag = false;
@@ -78,16 +73,7 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
for (Map.Entry<String, String> configPair : config.entrySet()) {
byte[] keyBytes = null;
byte[] messgeBytes = null;
- if (configPair.getKey().startsWith(CHECKPOINTPREFIX)) {
- String[] checkpointInfo = configPair.getKey().split(":");
- String[] sspOffsetPair = configPair.getValue().split(":");
- HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
- checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
- Checkpoint cp = new Checkpoint(checkpointMap);
- SetCheckpoint setCheckpoint = new SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
- keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
- messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
- } else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
+ if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
String[] changelogInfo = configPair.getKey().split(":");
String changeLogPartition = configPair.getValue();
SetChangelogMapping changelogMapping = new SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index ad1fbc5..2c64598 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -21,7 +21,10 @@ package org.apache.samza.serializers.model;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -38,12 +41,12 @@ public class TestSamzaObjectMapper {
public void testJsonTaskModel() throws Exception {
ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
Map<String, String> configMap = new HashMap<String, String>();
- Map<SystemStreamPartition, String> sspOffset = new HashMap<SystemStreamPartition, String>();
+ Set<SystemStreamPartition> ssp = new HashSet<>();
configMap.put("a", "b");
Config config = new MapConfig(configMap);
TaskName taskName = new TaskName("test");
- sspOffset.put(new SystemStreamPartition("foo", "bar", new Partition(1)), "");
- TaskModel taskModel = new TaskModel(taskName, sspOffset, new Partition(2));
+ ssp.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
+ TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2));
Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
tasks.put(taskName, taskModel);
ContainerModel containerModel = new ContainerModel(1, tasks);
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 00b8977..0865b31 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -41,7 +41,7 @@ object TestCheckpointTool {
var systemProducer: SystemProducer = null
var systemAdmin: SystemAdmin = null
- class MockCheckpointManagerFactory {
+ class MockCheckpointManagerFactory extends CheckpointManagerFactory {
def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
}
@@ -87,7 +87,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
@Test
def testReadLatestCheckpoint {
- val checkpointTool = new CheckpointTool(config, null, TestCheckpointTool.checkpointManager)
+ val checkpointTool = CheckpointTool(config, null)
checkpointTool.run
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
@@ -99,7 +99,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
- val checkpointTool = new CheckpointTool(config, toOverwrite, TestCheckpointTool.checkpointManager)
+ val checkpointTool = CheckpointTool(config, toOverwrite)
checkpointTool.run
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42")))
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index c00ef91..75ba8af 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -65,7 +65,7 @@ class TestOffsetManager {
val config = new MapConfig
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val systemAdmins = Map("test-system" -> getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
assertTrue(checkpointManager.isStarted)
@@ -97,7 +97,7 @@ class TestOffsetManager {
val config = new MapConfig
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val systemAdmins = Map("test-system" -> getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
// Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
@@ -242,17 +242,17 @@ class TestOffsetManager {
private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
- new CheckpointManager(null, null, null) {
+ new CheckpointManager {
var isStarted = false
var isStopped = false
var registered = Set[TaskName]()
var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
- override def start { isStarted = true }
- override def register(taskName: TaskName) { registered += taskName }
- override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
- override def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
- override def stop { isStopped = true }
+ def start { isStarted = true }
+ def register(taskName: TaskName) { registered += taskName }
+ def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
+ def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
+ def stop { isStopped = true }
// Only for testing purposes - not present in actual checkpoint manager
def getOffets = Map(taskName -> mapAsScalaMap(checkpoint.getOffsets()).toMap)
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
new file mode 100644
index 0000000..4ca738e
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.file
+
+import java.io.File
+import scala.collection.JavaConversions._
+import java.util.Random
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.apache.samza.SamzaException
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.container.TaskName
+import org.junit.rules.TemporaryFolder
+
+class TestFileSystemCheckpointManager {
+ val checkpointRoot = System.getProperty("java.io.tmpdir") // TODO: Move this out of tmp, into our build dir
+ val taskName = new TaskName("Warwickshire")
+ val baseFileLocation = new File(checkpointRoot)
+
+ val tempFolder = new TemporaryFolder
+
+ @Before
+ def createTempFolder = tempFolder.create()
+
+ @After
+ def deleteTempFolder = tempFolder.delete()
+
+ @Test
+ def testReadForCheckpointFileThatDoesNotExistShouldReturnNull {
+ val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
+ assertNull(cpm.readLastCheckpoint(taskName))
+ }
+
+ @Test
+ def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint {
+ val cp = new Checkpoint(Map(
+ new SystemStreamPartition("a", "b", new Partition(0)) -> "c",
+ new SystemStreamPartition("a", "c", new Partition(1)) -> "d",
+ new SystemStreamPartition("b", "d", new Partition(2)) -> "e"))
+
+ var readCp:Checkpoint = null
+ val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
+
+ cpm.start
+ cpm.writeCheckpoint(taskName, cp)
+ readCp = cpm.readLastCheckpoint(taskName)
+ cpm.stop
+
+ assertNotNull(readCp)
+ cp.equals(readCp)
+ assertEquals(cp.getOffsets.keySet(), readCp.getOffsets.keySet())
+ assertEquals(cp.getOffsets, readCp.getOffsets)
+ assertEquals(cp, readCp)
+ }
+
+ @Test
+ def testMissingRootDirectoryShouldFailOnManagerCreation {
+ val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot + new Random().nextInt))
+ try {
+ cpm.start
+ fail("Expected an exception since root directory for fs checkpoint manager doesn't exist.")
+ } catch {
+ case e: SamzaException => None // this is expected
+ }
+ cpm.stop
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a77ddc7..d91b1da 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -54,7 +54,7 @@ import org.scalatest.junit.AssertionsForJUnit
import java.lang.Thread.UncaughtExceptionHandler
import org.apache.samza.serializers._
import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
class TestSamzaContainer extends AssertionsForJUnit {
@Test
@@ -63,15 +63,15 @@ class TestSamzaContainer extends AssertionsForJUnit {
val offsets = new util.HashMap[SystemStreamPartition, String]()
offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
val tasks = Map(
- new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)),
- new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(0)))
+ new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
+ new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
val containers = Map(
Integer.valueOf(0) -> new ContainerModel(0, tasks),
Integer.valueOf(1) -> new ContainerModel(1, tasks))
val jobModel = new JobModel(config, containers)
def jobModelGenerator(): JobModel = jobModel
val server = new HttpServer
- val coordinator = new JobCoordinator(jobModel, server, new MockCheckpointManager)
+ val coordinator = new JobCoordinator(jobModel, server)
coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator))
try {
coordinator.start
@@ -87,12 +87,12 @@ class TestSamzaContainer extends AssertionsForJUnit {
val offsets = new util.HashMap[SystemStreamPartition, String]()
offsets.put(new SystemStreamPartition("system", "stream", new Partition(0)), "1")
val tasksForContainer1 = Map(
- new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)),
- new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(1)))
+ new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
+ new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(1)))
val tasksForContainer2 = Map(
- new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets, new Partition(2)),
- new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets, new Partition(3)),
- new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets, new Partition(4)))
+ new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets.keySet(), new Partition(2)),
+ new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets.keySet(), new Partition(3)),
+ new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets.keySet(), new Partition(4)))
val containerModel1 = new ContainerModel(0, tasksForContainer1)
val containerModel2 = new ContainerModel(1, tasksForContainer2)
val containers = Map(
@@ -204,7 +204,13 @@ class TestSamzaContainer extends AssertionsForJUnit {
}
}
-class MockCheckpointManager extends CheckpointManager(null, null, "Unknown") {
+class MockCheckpointManager extends CheckpointManager {
override def start() = {}
override def stop() = {}
+
+ override def register(taskName: TaskName): Unit = {}
+
+ override def readLastCheckpoint(taskName: TaskName): Checkpoint = { new Checkpoint(Map[SystemStreamPartition, String]()) }
+
+ override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint): Unit = { }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
index ddf1fde..6e9c6fa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -71,6 +71,6 @@ class TestGroupByContainerCount {
}
private def getTaskModel(name: String, partitionId: Int) = {
- new TaskModel(new TaskName(name), Map[SystemStreamPartition, String](), new Partition(partitionId))
+ new TaskModel(new TaskName(name), Set[SystemStreamPartition](), new Partition(partitionId))
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 1393da8..80cccf3 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,19 +19,14 @@
package org.apache.samza.coordinator
-
-import java.net.SocketTimeoutException
-
-import org.apache.samza.util.Util
import org.junit.{After, Test}
import org.junit.Assert._
-import org.junit.rules.ExpectedException
import scala.collection.JavaConversions._
import org.apache.samza.config.MapConfig
import org.apache.samza.config.TaskConfig
import org.apache.samza.config.SystemConfig
import org.apache.samza.container.{SamzaContainer, TaskName}
-import org.apache.samza.metrics.{MetricsRegistryMap, MetricsRegistry}
+import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.Config
import org.apache.samza.system.SystemFactory
import org.apache.samza.system.SystemAdmin
@@ -67,20 +62,16 @@ class TestJobCoordinator {
// Construct the expected JobModel, so we can compare it to
// JobCoordinator's JobModel.
val container0Tasks = Map(
- task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
- task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
+ task0Name -> new TaskModel(task0Name, checkpoint0.keySet, new Partition(4)),
+ task2Name -> new TaskModel(task2Name, checkpoint2.keySet, new Partition(5)))
val container1Tasks = Map(
- task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
+ task1Name -> new TaskModel(task1Name, checkpoint1.keySet, new Partition(3)))
val containers = Map(
Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
// The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
- val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
- task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
- val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
- task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
@@ -88,8 +79,6 @@ class TestJobCoordinator {
// Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
// SetCheckpoint and SetChangelog
val otherConfigs = Map(
- checkpointOffset0,
- checkpointOffset1,
changelogInfo0,
changelogInfo1,
changelogInfo2
@@ -116,43 +105,32 @@ class TestJobCoordinator {
}
@Test
- def testJobCoordinatorCheckpointing = {
+ def testJobCoordinatorChangelogPartitionMapping = {
System.out.println("test ")
val task0Name = new TaskName("Partition 0")
- val checkpoint0 = Map(new SystemStreamPartition("test", "stream1", new Partition(0)) -> "4")
+ val ssp0 = Set(new SystemStreamPartition("test", "stream1", new Partition(0)))
val task1Name = new TaskName("Partition 1")
- val checkpoint1 = Map(new SystemStreamPartition("test", "stream1", new Partition(1)) ->"3")
+ val ssp1 = Set(new SystemStreamPartition("test", "stream1", new Partition(1)))
val task2Name = new TaskName("Partition 2")
- val checkpoint2 = Map(new SystemStreamPartition("test", "stream1", new Partition(2)) -> "8")
+ val ssp2 = Set(new SystemStreamPartition("test", "stream1", new Partition(2)))
// Construct the expected JobModel, so we can compare it to
// JobCoordinator's JobModel.
val container0Tasks = Map(
- task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
- task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
+ task0Name -> new TaskModel(task0Name, ssp0, new Partition(4)),
+ task2Name -> new TaskModel(task2Name, ssp1, new Partition(5)))
val container1Tasks = Map(
- task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
+ task1Name -> new TaskModel(task1Name, ssp1, new Partition(3)))
val containers = Map(
Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
-
- // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
- val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
- task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
- val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
- task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
- val checkpointOffset2 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
- task2Name.getTaskName() -> (Util.sspToString(checkpoint2.keySet.iterator.next()) + ":" + checkpoint2.values.iterator.next())
val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
- val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
- val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
// Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
// SetCheckpoint and SetChangelog
// Write a couple of checkpoints that the job coordinator will process
val otherConfigs = Map(
- checkpointOffset0,
changelogInfo0
)
@@ -175,62 +153,22 @@ class TestJobCoordinator {
val url = coordinator.server.getUrl.toString
// Verify if the jobCoordinator has seen the checkpoints
- var offsets = extractOffsetsFromJobCoordinator(url)
- assertEquals(1, offsets.size)
- assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
-
- // Write more checkpoints
- val wrappedConsumer = new MockCoordinatorStreamSystemFactory()
- .getConsumer(null, null, null)
- .asInstanceOf[MockCoordinatorStreamWrappedConsumer]
-
- var moreMessageConfigs = Map(
- checkpointOffset1
- )
-
- wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
-
- // Verify if the coordinator has seen it
- offsets = extractOffsetsFromJobCoordinator(url)
- assertEquals(2, offsets.size)
- assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
- assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
-
- // Write more checkpoints but block on read on the mock consumer
- moreMessageConfigs = Map(
- checkpointOffset2
- )
-
- wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
-
- // Simulate consumer being blocked (Job coordinator waiting to read new checkpoints from coordinator after container failure)
- val latch = wrappedConsumer.blockPool();
-
- // verify if the port times out
- var seenException = false
- try {
- extractOffsetsFromJobCoordinator(url)
- }
- catch {
- case se: SocketTimeoutException => seenException = true
- }
- assertTrue(seenException)
+ val changelogPartitionMapping = extractChangelogPartitionMapping(url)
+ assertEquals(3, changelogPartitionMapping.size)
+ val expectedChangelogPartitionMapping = Map(task0Name -> 4, task1Name -> 5, task2Name -> 6)
+ assertEquals(expectedChangelogPartitionMapping.get(task0Name), changelogPartitionMapping.get(task0Name))
+ assertEquals(expectedChangelogPartitionMapping.get(task1Name), changelogPartitionMapping.get(task1Name))
+ assertEquals(expectedChangelogPartitionMapping.get(task2Name), changelogPartitionMapping.get(task2Name))
- // verify if it has read the new checkpoints after job coordinator has loaded the new checkpoints
- latch.countDown()
- offsets = extractOffsetsFromJobCoordinator(url)
- assertEquals(offsets.size, 3)
- assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
- assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
- assertEquals(checkpoint2.head._2, offsets.getOrElse(checkpoint2.head._1, fail()))
coordinator.stop
}
- def extractOffsetsFromJobCoordinator(url : String) = {
+ def extractChangelogPartitionMapping(url : String) = {
val jobModel = SamzaContainer.readJobModel(url.toString)
val taskModels = jobModel.getContainers.values().flatMap(_.getTasks.values())
- val offsets = taskModels.flatMap(_.getCheckpointedOffsets).toMap
- offsets.filter(_._2 != null)
+ taskModels.map{taskModel => {
+ taskModel.getTaskName -> taskModel.getChangelogPartition.getPartitionId
+ }}.toMap
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 9036e81..e97656a 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -24,6 +24,7 @@ import java.io.File
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.migration.JobRunnerMigration
import org.junit.Test
import org.junit.After
import org.junit.Assert._
@@ -51,8 +52,7 @@ class TestJobRunner {
"file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath))
fail("Should have failed already.")
} catch {
- case se: SamzaException => assertEquals(se.getMessage, "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
- "for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration")
+ case se: SamzaException => assertEquals(se.getMessage, JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index a1efe6f..3a710a8 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -56,7 +56,7 @@ class TestProcessJob {
}
}
-class MockJobCoordinator extends JobCoordinator(null, null, null) {
+class MockJobCoordinator extends JobCoordinator(null, null) {
var stopped: Boolean = false
override def start: Unit = { }