You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/29 04:57:51 UTC
[4/4] samza git commit: SAMZA-465: Use coordinator stream and
eliminate CheckpointManager
SAMZA-465: Use coordinator stream and eliminate CheckpointManager
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/23fb2e1c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/23fb2e1c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/23fb2e1c
Branch: refs/heads/master
Commit: 23fb2e1c097317845439ec0abb938821a6106969
Parents: c37d752
Author: Naveen Somasundaram <na...@gmail.com>
Authored: Tue Apr 28 19:57:06 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-ld1.linkedin.biz>
Committed: Tue Apr 28 19:57:06 2015 -0700
----------------------------------------------------------------------
build.gradle | 6 +-
.../org/apache/samza/checkpoint/Checkpoint.java | 72 ---
.../samza/checkpoint/CheckpointManager.java | 70 ---
.../checkpoint/CheckpointManagerFactory.java | 30 --
.../org/apache/samza/container/TaskName.java | 4 +-
.../org/apache/samza/system/SystemAdmin.java | 29 +-
.../org/apache/samza/task/TaskCoordinator.java | 2 +-
...inglePartitionWithoutOffsetsSystemAdmin.java | 5 +
.../org/apache/samza/checkpoint/Checkpoint.java | 72 +++
.../samza/checkpoint/CheckpointManager.java | 116 +++++
.../stream/CoordinatorStreamMessage.java | 476 +++++++++++++++++++
.../stream/CoordinatorStreamSystemConsumer.java | 197 ++++++++
.../stream/CoordinatorStreamSystemProducer.java | 134 ++++++
.../org/apache/samza/job/model/TaskModel.java | 72 +--
.../serializers/model/JsonTaskModelMixIn.java | 8 +-
.../serializers/model/SamzaObjectMapper.java | 24 +
.../storage/ChangelogPartitionManager.java | 115 +++++
.../samza/checkpoint/CheckpointTool.scala | 34 +-
.../apache/samza/checkpoint/OffsetManager.scala | 58 +--
.../file/FileSystemCheckpointManager.scala | 107 -----
.../org/apache/samza/config/JobConfig.scala | 63 ++-
.../samza/config/ShellCommandConfig.scala | 4 +-
.../org/apache/samza/config/StreamConfig.scala | 2 -
.../org/apache/samza/config/TaskConfig.scala | 2 +-
.../apache/samza/container/SamzaContainer.scala | 18 +-
.../samza/coordinator/JobCoordinator.scala | 270 +++++++----
.../samza/coordinator/server/HttpServer.scala | 3 +-
.../samza/coordinator/server/JobServlet.scala | 7 +-
.../stream/CoordinatorStreamSystemFactory.scala | 50 ++
.../scala/org/apache/samza/job/JobRunner.scala | 77 +--
.../samza/job/local/ProcessJobFactory.scala | 9 +-
.../samza/job/local/ThreadJobFactory.scala | 5 +-
.../apache/samza/serializers/JsonSerde.scala | 30 +-
.../filereader/FileReaderSystemAdmin.scala | 8 +-
.../main/scala/org/apache/samza/util/Util.scala | 131 ++++-
.../MockCoordinatorStreamSystemFactory.java | 118 +++++
.../MockCoordinatorStreamWrappedConsumer.java | 128 +++++
.../stream/TestCoordinatorStreamMessage.java | 66 +++
.../TestCoordinatorStreamSystemConsumer.java | 133 ++++++
.../TestCoordinatorStreamSystemProducer.java | 153 ++++++
.../model/TestSamzaObjectMapper.java | 6 +-
samza-core/src/test/resources/test.properties | 1 +
.../samza/checkpoint/TestCheckpointTool.scala | 20 +-
.../samza/checkpoint/TestOffsetManager.scala | 50 +-
.../file/TestFileSystemCheckpointManager.scala | 86 ----
.../samza/container/TestSamzaContainer.scala | 21 +-
.../task/TestGroupByContainerCount.scala | 2 +-
.../samza/coordinator/TestJobCoordinator.scala | 238 ++++++++--
.../coordinator/server/TestHttpServer.scala | 2 +-
.../apache/samza/job/local/TestProcessJob.scala | 2 +-
.../samza/serializers/TestJsonSerde.scala | 4 +-
.../kafka/KafkaCheckpointLogKey.scala | 186 --------
.../kafka/KafkaCheckpointManager.scala | 427 -----------------
.../kafka/KafkaCheckpointManagerFactory.scala | 116 -----
.../org/apache/samza/config/KafkaConfig.scala | 10 -
.../samza/config/RegExTopicGenerator.scala | 2 +-
.../samza/system/kafka/KafkaSystemAdmin.scala | 59 ++-
.../samza/system/kafka/KafkaSystemFactory.scala | 23 +-
.../kafka/TestKafkaCheckpointLogKey.scala | 71 ---
.../kafka/TestKafkaCheckpointManager.scala | 211 --------
.../system/kafka/TestKafkaSystemAdmin.scala | 26 +-
.../samza/logging/log4j/StreamAppender.java | 7 +-
.../src/main/config/join/common.properties | 8 +-
.../src/main/config/negate-number.properties | 3 +
.../perf/container-performance.properties | 12 +-
.../kafka-read-write-performance.properties | 3 +
.../samza/system/mock/MockSystemAdmin.java | 18 +-
.../src/main/python/samza_failure_testing.py | 2 +
.../test/performance/TestPerformanceTask.scala | 4 +-
.../test/integration/TestStatefulTask.scala | 17 +-
.../org/apache/samza/config/YarnConfig.scala | 3 -
.../apache/samza/job/yarn/SamzaAppMaster.scala | 12 +-
.../samza/job/yarn/SamzaAppMasterState.scala | 3 +-
.../job/yarn/SamzaAppMasterTaskManager.scala | 12 +-
.../org/apache/samza/job/yarn/YarnJob.scala | 9 +-
.../job/yarn/TestSamzaAppMasterLifecycle.scala | 8 +-
.../job/yarn/TestSamzaAppMasterService.scala | 15 +-
.../yarn/TestSamzaAppMasterTaskManager.scala | 42 +-
78 files changed, 2815 insertions(+), 1834 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 97de3a2..ebad6eb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -123,7 +123,9 @@ project(':samza-api') {
project(":samza-core_$scalaVersion") {
apply plugin: 'scala'
-
+ // Force scala joint compilation
+ sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.main.java.srcDirs = []
jar {
manifest {
attributes("Implementation-Version": "$version")
@@ -239,6 +241,7 @@ project(":samza-yarn_$scalaVersion") {
compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
+ testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
}
repositories {
@@ -384,6 +387,7 @@ project(":samza-test_$scalaVersion") {
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
testCompile "com.101tec:zkclient:$zkClientVersion"
testCompile project(":samza-kafka_$scalaVersion")
+ testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
deleted file mode 100644
index 593d118..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
- private final Map<SystemStreamPartition, String> offsets;
-
- /**
- * Constructs a new checkpoint based off a map of Samza stream offsets.
- * @param offsets Map of Samza streams to their current offset.
- */
- public Checkpoint(Map<SystemStreamPartition, String> offsets) {
- this.offsets = offsets;
- }
-
- /**
- * Gets a unmodifiable view of the current Samza stream offsets.
- * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
- */
- public Map<SystemStreamPartition, String> getOffsets() {
- return Collections.unmodifiableMap(offsets);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Checkpoint)) return false;
-
- Checkpoint that = (Checkpoint) o;
-
- if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return offsets != null ? offsets.hashCode() : 0;
- }
-
- @Override
- public String toString() {
- return "Checkpoint [offsets=" + offsets + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
deleted file mode 100644
index 092cb91..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.container.TaskName;
-
-import java.util.Map;
-
-/**
- * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
- * implementation-specific location.
- */
-public interface CheckpointManager {
- public void start();
-
- /**
- * Registers this manager to write checkpoints of a specific Samza stream partition.
- * @param taskName Specific Samza taskName of which to write checkpoints for.
- */
- public void register(TaskName taskName);
-
- /**
- * Writes a checkpoint based on the current state of a Samza stream partition.
- * @param taskName Specific Samza taskName of which to write a checkpoint of.
- * @param checkpoint Reference to a Checkpoint object to store offset data in.
- */
- public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);
-
- /**
- * Returns the last recorded checkpoint for a specified taskName.
- * @param taskName Specific Samza taskName for which to get the last checkpoint of.
- * @return A Checkpoint object with the recorded offset data of the specified partition.
- */
- public Checkpoint readLastCheckpoint(TaskName taskName);
-
- /**
- * Read the taskName to partition mapping that is being maintained by this CheckpointManager
- *
- * @return TaskName to task log partition mapping, or an empty map if there were no messages.
- */
- public Map<TaskName, Integer> readChangeLogPartitionMapping();
-
- /**
- * Write the taskName to partition mapping that is being maintained by this CheckpointManager
- *
- * @param mapping Each TaskName's partition within the changelog
- */
- public void writeChangeLogPartitionMapping(Map<TaskName, Integer> mapping);
-
- public void stop();
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
deleted file mode 100644
index a97ff09..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-
-/**
- * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
- */
-public interface CheckpointManagerFactory {
- public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/container/TaskName.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
index 0833586..aa19293 100644
--- a/samza-api/src/main/java/org/apache/samza/container/TaskName.java
+++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
@@ -21,7 +21,9 @@ package org.apache.samza.container;
/**
* A unique identifier of a set of a SystemStreamPartitions that have been grouped by
* a {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}. The
- * SystemStreamPartitionGrouper determines the TaskName for each set it creates.
+ * SystemStreamPartitionGrouper determines the TaskName for each set it creates. The TaskName class
+ * should only contain the taskName. This is necessary because the ChangelogManager assumes that the taskName is
+ * unique enough to identify this class, and uses it to store it in the underlying coordinator stream (as a key).
*/
public class TaskName implements Comparable<TaskName> {
private final String taskName;
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 8995ba3..7a588eb 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -23,10 +23,9 @@ import java.util.Map;
import java.util.Set;
/**
- * Helper interface attached to an underlying system to fetch
- * information about streams, partitions, offsets, etc. This interface is useful
- * for providing utility methods that Samza needs in order to interact with a
- * system.
+ * Helper interface attached to an underlying system to fetch information about
+ * streams, partitions, offsets, etc. This interface is useful for providing
+ * utility methods that Samza needs in order to interact with a system.
*/
public interface SystemAdmin {
@@ -51,10 +50,22 @@ public interface SystemAdmin {
*/
Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
- /**
- * An API to create a change log stream
- * @param streamName The name of the stream to be created in the underlying stream
- * @param numOfPartitions The number of partitions in the changelog stream
- */
+ /**
+ * An API to create a change log stream
+ *
+ * @param streamName
+ * The name of the stream to be created in the underlying stream
+ * @param numOfPartitions
+ * The number of partitions in the changelog stream
+ */
void createChangelogStream(String streamName, int numOfPartitions);
+
+ /**
+ * Create a stream for the job coordinator. If the stream already exists, this
+ * call should simply return.
+ *
+ * @param streamName
+ * The name of the coordinator stream to create.
+ */
+ void createCoordinatorStream(String streamName);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
index 6ff1a55..e7bf6f1 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
@@ -22,7 +22,7 @@ package org.apache.samza.task;
/**
* TaskCoordinators are provided to the process methods of {@link org.apache.samza.task.StreamTask} implementations
* to allow the user code to request actions from the Samza framework, including committing the current checkpoints
- * to configured {@link org.apache.samza.checkpoint.CheckpointManager}s or shutting down the task or all tasks within
+ * to configured org.apache.samza.checkpoint.CheckpointManager or shutting down the task or all tasks within
* a container.
* <p>
* This interface may evolve over time.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 01997ae..a6b14fb 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -69,4 +69,9 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
return offsetsAfter;
}
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+ throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
new file mode 100644
index 0000000..593d118
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+public class Checkpoint {
+ private final Map<SystemStreamPartition, String> offsets;
+
+ /**
+ * Constructs a new checkpoint based off a map of Samza stream offsets.
+ * @param offsets Map of Samza streams to their current offset.
+ */
+ public Checkpoint(Map<SystemStreamPartition, String> offsets) {
+ this.offsets = offsets;
+ }
+
+ /**
+ * Gets a unmodifiable view of the current Samza stream offsets.
+ * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+ */
+ public Map<SystemStreamPartition, String> getOffsets() {
+ return Collections.unmodifiableMap(offsets);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Checkpoint)) return false;
+
+ Checkpoint that = (Checkpoint) o;
+
+ if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return offsets != null ? offsets.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "Checkpoint [offsets=" + offsets + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
new file mode 100644
index 0000000..3ac63ca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
+ * CoordinatorStream underneath to do this.
+ */
+public class CheckpointManager {
+
+ private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
+ private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+ private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+ private final Map<TaskName, Checkpoint> taskNamesToOffsets;
+ private final HashSet<TaskName> taskNames;
+ private String source;
+
+ public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+ this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+ this.coordinatorStreamProducer = coordinatorStreamProducer;
+ taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
+ taskNames = new HashSet<TaskName>();
+ this.source = "Unknown";
+ }
+
+ public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+ String source) {
+ this(coordinatorStreamProducer, coordinatorStreamConsumer);
+ this.source = source;
+ }
+
+ public void start() {
+ coordinatorStreamProducer.start();
+ coordinatorStreamConsumer.start();
+ }
+
+ /**
+ * Registers this manager to write checkpoints of a specific Samza stream partition.
+ * @param taskName Specific Samza taskName of which to write checkpoints for.
+ */
+ public void register(TaskName taskName) {
+ log.debug("Adding taskName {} to {}", taskName, this);
+ taskNames.add(taskName);
+ coordinatorStreamConsumer.register();
+ coordinatorStreamProducer.register(taskName.getTaskName());
+ }
+
+ /**
+ * Writes a checkpoint based on the current state of a Samza stream partition.
+ * @param taskName Specific Samza taskName of which to write a checkpoint of.
+ * @param checkpoint Reference to a Checkpoint object to store offset data in.
+ */
+ public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
+ log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
+ SetCheckpoint checkPointMessage = new SetCheckpoint(source, taskName.getTaskName(), checkpoint);
+ coordinatorStreamProducer.send(checkPointMessage);
+ }
+
+ /**
+ * Returns the last recorded checkpoint for a specified taskName.
+ * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+ * @return A Checkpoint object with the recorded offset data of the specified partition.
+ */
+ public Checkpoint readLastCheckpoint(TaskName taskName) {
+ // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
+ log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
+ Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetCheckpoint.TYPE);
+ for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+ SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
+ TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
+ if(taskNames.contains(taskNameInCheckpoint)) {
+ taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint());
+ log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName);
+ }
+ }
+ return taskNamesToOffsets.get(taskName);
+ }
+
+ public void stop() {
+ coordinatorStreamConsumer.stop();
+ coordinatorStreamProducer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
new file mode 100644
index 0000000..f8b705f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Represents a message for the job coordinator. All messages in the coordinator
+ * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
+ * messages are modeled as key/value pairs. The key is a list of well defined
+ * fields: version, type, and key. The value is a map. There are some
+ * pre-defined fields (such as timestamp, host, etc) for the value map, which
+ * are common to all messages.
+ * </p>
+ *
+ * <p>
+ * The full structure for a CoordinatorStreamMessage is:
+ * </p>
+ *
+ * <pre>
+ * key => [1, "set-config", "job.name"]
+ *
+ * message => {
+ * "host": "192.168.0.1",
+ * "username": "criccomini",
+ * "source": "job-runner",
+ * "timestamp": 123456789,
+ * "values": {
+ * "value": "my-job-name"
+ * }
+ * }
+ * </pre>
+ *
+ * Where the key's structure is:
+ *
+ * <pre>
+ * key => [<version>, <type>, <key>]
+ * </pre>
+ *
+ * <p>
+ * Note that the white space in the above JSON blobs are done for legibility.
+ * Over the wire, the JSON should be compact, and no unnecessary white space
+ * should be used. This is extremely important for key serialization, since a
+ * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
+ * be evaluated as two different keys, and Kafka will not log compact them (if
+ * Kafka is used as the underlying system for a coordinator stream).
+ * </p>
+ *
+ * <p>
+ * The "values" map in the message is defined on a per-message-type basis. For
+ * set-config messages, there is just a single key/value pair, where the "value"
+ * key is defined. For offset messages, there will be multiple key/values pairs
+ * in "values" (one for each SystemStreamPartition/offset pair for a given
+ * TaskName).
+ * </p>
+ *
+ * <p>
+ * The most important fields are type, key, and values. The type field (defined
+ * as index 1 in the key list) defines the kind of message, the key (defined as
+ * index 2 in the key list) defines a key to associate with the values, and the
+ * values map defines a set of values associated with the type. A concrete
+ * example would be a config message of type "set-config" with key "job.name"
+ * and values {"value": "my-job-name"}.
+ * </p>
+ */
+public class CoordinatorStreamMessage {
+ public static int VERSION_INDEX = 0;
+ public static int TYPE_INDEX = 1;
+ public static int KEY_INDEX = 2;
+
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
+
+ /**
+ * Protocol version for coordinator stream messages. This version number must
+ * be incremented any time new messages are added to the coordinator stream,
+ * or changes are made to the key/message headers.
+ */
+ public static final int VERSION = 1;
+
+ /**
+ * Contains all key fields. Currently, this includes the type of the message,
+ * the key associated with the type (e.g. type: set-config key: job.name), and
+ * the version of the protocol. The indices are defined as the INDEX static
+ * variables above.
+ */
+ private final Object[] keyArray;
+
+ /**
+ * Contains all fields for the message. This includes who sent the message,
+ * the host, etc. It also includes a "values" map, which contains all values
+ * associated with the key of the message. If set-config/job.name were used as
+ * the type/key of the message, then values would contain
+ * {"value":"my-job-name"}.
+ */
+ private final Map<String, Object> messageMap;
+ private boolean isDelete;
+
+ public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
+ this(message.getKeyArray(), message.getMessageMap());
+ }
+
+ public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
+ this.keyArray = keyArray;
+ this.messageMap = messageMap;
+ this.isDelete = messageMap == null;
+ }
+
+ public CoordinatorStreamMessage(String source) {
+ this(source, new Object[] { Integer.valueOf(VERSION), null, null }, new HashMap<String, Object>());
+ }
+
+ public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
+ this(keyArray, messageMap);
+ if (!isDelete) {
+ this.messageMap.put("values", new HashMap<String, String>());
+ setSource(source);
+ setUsername(System.getProperty("user.name"));
+ setTimestamp(System.currentTimeMillis());
+
+ try {
+ setHost(InetAddress.getLocalHost().getHostAddress());
+ } catch (UnknownHostException e) {
+ log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
+ setHost("");
+ }
+ }
+
+ setVersion(VERSION);
+ }
+
+ protected void setIsDelete(boolean isDelete) {
+ this.isDelete = isDelete;
+ }
+
+ protected void setHost(String host) {
+ messageMap.put("host", host);
+ }
+
+ protected void setUsername(String username) {
+ messageMap.put("username", username);
+ }
+
+ protected void setSource(String source) {
+ messageMap.put("source", source);
+ }
+
+ protected void setTimestamp(long timestamp) {
+ messageMap.put("timestamp", Long.valueOf(timestamp));
+ }
+
+ protected void setVersion(int version) {
+ this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
+ }
+
+ protected void setType(String type) {
+ this.keyArray[TYPE_INDEX] = type;
+ }
+
+ protected void setKey(String key) {
+ this.keyArray[KEY_INDEX] = key;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map<String, String> getMessageValues() {
+ return (Map<String, String>) this.messageMap.get("values");
+ }
+
+ protected String getMessageValue(String key) {
+ return getMessageValues().get(key);
+ }
+
+ /**
+ * @param key
+ * The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
+ * easy mutation of the coordinator stream outside of Samza (scripts)
+ * @param value
+ * The value corresponding to the key, should also be a simple string
+ */
+ protected void putMessageValue(String key, String value) {
+ getMessageValues().put(key, value);
+ }
+
+ /**
+ * The type of the message is used to convert a generic
+ * CoordinatorStreaMessage into a specific message, such as a SetConfig
+ * message.
+ *
+ * @return The type of the message.
+ */
+ public String getType() {
+ return (String) this.keyArray[TYPE_INDEX];
+ }
+
+ /**
+ * @return The whole key map including both the key and type of the message.
+ */
+ public Object[] getKeyArray() {
+ return this.keyArray;
+ }
+
+ /**
+ * @return Whether the message signifies a delete or not.
+ */
+ public boolean isDelete() {
+ return isDelete;
+ }
+
+ /**
+ * @return Whether the message signifies a delete or not.
+ */
+ public String getUsername() {
+ return (String) this.messageMap.get("username");
+ }
+
+ /**
+ * @return Whether the message signifies a delete or not.
+ */
+ public long getTimestamp() {
+ return (Long) this.messageMap.get("timestamp");
+ }
+
+ /**
+ * @return The whole message map including header information.
+ */
+ public Map<String, Object> getMessageMap() {
+ if (!isDelete) {
+ Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
+ // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
+ immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
+ return Collections.unmodifiableMap(immutableMap);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * @return The source that sent the coordinator message. This is a string
+ * defined by the sender.
+ */
+ public String getSource() {
+ return (String) this.messageMap.get("source");
+ }
+
+ /**
+ * @return The protocol version that the message conforms to.
+ */
+ public int getVersion() {
+ return (Integer) this.keyArray[VERSION_INDEX];
+ }
+
+ /**
+ * @return The key for a message. The key's meaning is defined by the type of
+ * the message.
+ */
+ public String getKey() {
+ return (String) this.keyArray[KEY_INDEX];
+ }
+
+ @Override
+ public String toString() {
+ return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (isDelete ? 1231 : 1237);
+ result = prime * result + Arrays.hashCode(keyArray);
+ result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
+ if (isDelete != other.isDelete)
+ return false;
+ if (!Arrays.equals(keyArray, other.keyArray))
+ return false;
+ if (messageMap == null) {
+ if (other.messageMap != null)
+ return false;
+ } else if (!messageMap.equals(other.messageMap))
+ return false;
+ return true;
+ }
+
+ /**
+ * A coordinator stream message that tells the job coordinator to set a
+ * specific configuration.
+ */
+ public static class SetConfig extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-config";
+
+ public SetConfig(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ public SetConfig(String source, String key, String value) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ putMessageValue("value", value);
+ }
+
+ public String getConfigValue() {
+ return (String) getMessageValue("value");
+ }
+ }
+
+ public static class Delete extends CoordinatorStreamMessage {
+ public Delete(String source, String key, String type) {
+ this(source, key, type, VERSION);
+ }
+
+ /**
+ * <p>
+ * Delete messages must take the type of another CoordinatorStreamMessage
+ * (e.g. SetConfig) to define the type of message that's being deleted.
+ * Considering Kafka's log compaction, for example, the keys of a message
+ * and its delete key must match exactly:
+ * </p>
+ *
+ * <pre>
+ * k=>[1,"job.name","set-config"] .. v=> {..some stuff..}
+ * v=>[1,"job.name","set-config"] .. v=> null
+ * </pre>
+ *
+ * <p>
+ * Deletes are modeled as a CoordinatorStreamMessage with a null message
+ * map, and a key that's identical to the key map that's to be deleted.
+ * </p>
+ *
+ * @param source
+ * The source ID of the sender of the delete message.
+ * @param key
+ * The key to delete.
+ * @param type
+ * The type of message to delete. Must correspond to one of hte
+ * other CoordinatorStreamMessages.
+ * @param version
+ * The protocol version.
+ */
+ public Delete(String source, String key, String type, int version) {
+ super(source);
+ setType(type);
+ setKey(key);
+ setVersion(version);
+ setIsDelete(true);
+ }
+ }
+
+ /**
+ * The SetCheckpoint is used to store the checkpoint messages for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-checkpoint
+ * Source: ContainerID
+ * MessageMap:
+ * {
+ * SSP1 : offset,
+ * SSP2 : offset
+ * }
+ * }
+ */
+ public static class SetCheckpoint extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-checkpoint";
+
+ public SetCheckpoint(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ *
+ * @param source The source writing the checkpoint
+ * @param key The key for the checkpoint message (Typically task name)
+ * @param checkpoint Checkpoint message to be written to the stream
+ */
+ public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
+ for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
+ putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
+ }
+ }
+
+ public Checkpoint getCheckpoint() {
+ Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
+ for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
+ offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
+ }
+ return new Checkpoint(offsetMap);
+ }
+ }
+
+ /**
+ * The SetChanglog is used to store the changelog parition information for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-changelog
+ * Source: ContainerID
+ * MessageMap:
+ * {
+ * "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
+ * }
+ * }
+ */
+ public static class SetChangelogMapping extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-changelog";
+
+ public SetChangelogMapping(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ *
+ * @param source Source writing the change log mapping
+ * @param taskName The task name to be used in the mapping
+ * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
+ */
+ public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
+ super(source);
+ setType(TYPE);
+ setKey(taskName);
+ putMessageValue("Partition", String.valueOf(changelogPartitionNumber));
+ }
+
+ public String getTaskName() {
+ return getKey();
+ }
+
+ public int getPartition() {
+ return Integer.parseInt(getMessageValue("Partition"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..2134603
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemConsumer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemConsumer {
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemConsumer.class);
+
+ private final Serde<List<?>> keySerde;
+ private final Serde<Map<String, Object>> messageSerde;
+ private final SystemStreamPartition coordinatorSystemStreamPartition;
+ private final SystemConsumer systemConsumer;
+ private final SystemAdmin systemAdmin;
+ private final Map<String, String> configMap;
+ private boolean isBootstrapped;
+ private boolean isStarted;
+ private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new HashSet<CoordinatorStreamMessage>();
+
+ public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+ this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+ this.systemConsumer = systemConsumer;
+ this.systemAdmin = systemAdmin;
+ this.configMap = new HashMap<String, String>();
+ this.isBootstrapped = false;
+ this.keySerde = keySerde;
+ this.messageSerde = messageSerde;
+ }
+
+ public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+ this(coordinatorSystemStream, systemConsumer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+ }
+
+ /**
+ * Retrieves the oldest offset in the coordinator stream, and registers the
+ * coordinator stream with the SystemConsumer using the earliest offset.
+ */
+ public void register() {
+ log.debug("Attempting to register: {}", coordinatorSystemStreamPartition);
+ Set<String> streamNames = new HashSet<String>();
+ String streamName = coordinatorSystemStreamPartition.getStream();
+ streamNames.add(streamName);
+ Map<String, SystemStreamMetadata> systemStreamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+
+ if (systemStreamMetadataMap == null) {
+ throw new SamzaException("Received a null systemStreamMetadataMap from the systemAdmin. This is illegal.");
+ }
+
+ SystemStreamMetadata systemStreamMetadata = systemStreamMetadataMap.get(streamName);
+
+ if (systemStreamMetadata == null) {
+ throw new SamzaException("Expected " + streamName + " to be in system stream metadata.");
+ }
+
+ SystemStreamPartitionMetadata systemStreamPartitionMetadata = systemStreamMetadata.getSystemStreamPartitionMetadata().get(coordinatorSystemStreamPartition.getPartition());
+
+ if (systemStreamPartitionMetadata == null) {
+ throw new SamzaException("Expected metadata for " + coordinatorSystemStreamPartition + " to exist.");
+ }
+
+ String startingOffset = systemStreamPartitionMetadata.getOldestOffset();
+ log.debug("Registering {} with offset {}", coordinatorSystemStreamPartition, startingOffset);
+ systemConsumer.register(coordinatorSystemStreamPartition, startingOffset);
+ }
+
+ /**
+ * Starts the underlying SystemConsumer.
+ */
+ public void start() {
+ if(isStarted)
+ {
+ log.info("Coordinator stream consumer already started");
+ return;
+ }
+ log.info("Starting coordinator stream system consumer.");
+ systemConsumer.start();
+ isStarted = true;
+ }
+
+ /**
+ * Stops the underlying SystemConsumer.
+ */
+ public void stop() {
+ log.info("Stopping coordinator stream system consumer.");
+ systemConsumer.stop();
+ }
+
+ /**
+ * Read all messages from the earliest offset, all the way to the latest.
+ * Currently, this method only pays attention to config messages.
+ */
+ public void bootstrap() {
+ log.info("Bootstrapping configuration from coordinator stream.");
+ SystemStreamPartitionIterator iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+
+ try {
+ while (iterator.hasNext()) {
+ IncomingMessageEnvelope envelope = iterator.next();
+ Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
+ Map<String, Object> valueMap = null;
+ if (envelope.getMessage() != null) {
+ valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
+ }
+ CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
+ log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+ bootstrappedStreamSet.add(coordinatorStreamMessage);
+ if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+ String configKey = coordinatorStreamMessage.getKey();
+ if (coordinatorStreamMessage.isDelete()) {
+ configMap.remove(configKey);
+ } else {
+ String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
+ configMap.put(configKey, configValue);
+ }
+ }
+ }
+ log.debug("Bootstrapped configuration: {}", configMap);
+ isBootstrapped = true;
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ public Set<CoordinatorStreamMessage> getBoostrappedStream() {
+ log.info("Returning the bootstrapped data from the stream");
+ if(!isBootstrapped)
+ bootstrap();
+ return bootstrappedStreamSet;
+ }
+
+ public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
+ log.debug("Bootstrapping coordinator stream for messages of type {}", type);
+ bootstrap();
+ HashSet<CoordinatorStreamMessage> bootstrappedStream = new HashSet<CoordinatorStreamMessage>();
+ for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
+ if(type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
+ bootstrappedStream.add(coordinatorStreamMessage);
+ }
+ }
+ return bootstrappedStream;
+ }
+
+ /**
+ * @return The bootstrapped configuration that's been read after bootstrap has
+ * been invoked.
+ */
+ public Config getConfig() {
+ if (isBootstrapped) {
+ return new MapConfig(configMap);
+ } else {
+ throw new SamzaException("Must call bootstrap before retrieving config.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..0f3e10e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemProducer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemProducer {
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemProducer.class);
+
+ private final Serde<List<?>> keySerde;
+ private final Serde<Map<String, Object>> messageSerde;
+ private final SystemStream systemStream;
+ private final SystemProducer systemProducer;
+ private final SystemAdmin systemAdmin;
+ private boolean isStarted;
+
+ public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
+ this(systemStream, systemProducer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+ }
+
+ public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+ this.systemStream = systemStream;
+ this.systemProducer = systemProducer;
+ this.systemAdmin = systemAdmin;
+ this.keySerde = keySerde;
+ this.messageSerde = messageSerde;
+ }
+
+ /**
+ * Registers a source with the underlying SystemProducer.
+ *
+ * @param source
+ * The source to register.
+ */
+ public void register(String source) {
+ systemProducer.register(source);
+ }
+
+ /**
+ * Creates the coordinator stream, and starts the system producer.
+ */
+ public void start() {
+ if(isStarted)
+ {
+ log.info("Coordinator stream producer already started");
+ return;
+ }
+ log.info("Starting coordinator stream producer.");
+ systemProducer.start();
+ isStarted = true;
+ }
+
+ /**
+ * Stops the underlying SystemProducer.
+ */
+ public void stop() {
+ log.info("Stopping coordinator stream producer.");
+ systemProducer.stop();
+ }
+
+ /**
+ * Serialize and send a coordinator stream message.
+ *
+ * @param message
+ * The message to send.
+ */
+ public void send(CoordinatorStreamMessage message) {
+ log.debug("Sending {}", message);
+ try {
+ String source = message.getSource();
+ byte[] key = keySerde.toBytes(Arrays.asList(message.getKeyArray()));
+ byte[] value = null;
+ if (!message.isDelete()) {
+ value = messageSerde.toBytes(message.getMessageMap());
+ }
+ OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(systemStream, Integer.valueOf(0), key, value);
+ systemProducer.send(source, envelope);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ /**
+ * Helper method that sends a series of SetConfig messages to the coordinator
+ * stream.
+ *
+ * @param source
+ * An identifier to denote which source is sending a message. This
+ * can be any arbitrary string.
+ * @param config
+ * The config object to store in the coordinator stream.
+ */
+ public void writeConfig(String source, Config config) {
+ log.debug("Writing config: {}", config);
+ for (Map.Entry<String, String> configPair : config.entrySet()) {
+ send(new CoordinatorStreamMessage.SetConfig(source, configPair.getKey(), configPair.getValue()));
+ }
+ systemProducer.flush(source);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index eb22d2e..c26690a 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -20,11 +20,13 @@
package org.apache.samza.job.model;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
+
/**
* <p>
* The data model used to represent a task. The model is used in the job
@@ -39,12 +41,12 @@ import org.apache.samza.system.SystemStreamPartition;
*/
public class TaskModel implements Comparable<TaskModel> {
private final TaskName taskName;
- private final Set<SystemStreamPartition> systemStreamPartitions;
+ private final Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets;
private final Partition changelogPartition;
- public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+ public TaskModel(TaskName taskName, Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, Partition changelogPartition) {
this.taskName = taskName;
- this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
+ this.systemStreamPartitionsToOffsets = Collections.unmodifiableMap(systemStreamPartitionsToOffsets);
this.changelogPartition = changelogPartition;
}
@@ -53,55 +55,55 @@ public class TaskModel implements Comparable<TaskModel> {
}
public Set<SystemStreamPartition> getSystemStreamPartitions() {
- return systemStreamPartitions;
+ return systemStreamPartitionsToOffsets.keySet();
}
public Partition getChangelogPartition() {
return changelogPartition;
}
- @Override
- public String toString() {
- return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+ public Map<SystemStreamPartition, String> getCheckpointedOffsets() {
+ return systemStreamPartitionsToOffsets;
}
@Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((changelogPartition == null) ? 0 : changelogPartition.hashCode());
- result = prime * result + ((systemStreamPartitions == null) ? 0 : systemStreamPartitions.hashCode());
- result = prime * result + ((taskName == null) ? 0 : taskName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
- TaskModel other = (TaskModel) obj;
- if (changelogPartition == null) {
- if (other.changelogPartition != null)
- return false;
- } else if (!changelogPartition.equals(other.changelogPartition))
+ }
+
+ TaskModel taskModel = (TaskModel) o;
+
+ if (!changelogPartition.equals(taskModel.changelogPartition)) {
return false;
- if (systemStreamPartitions == null) {
- if (other.systemStreamPartitions != null)
- return false;
- } else if (!systemStreamPartitions.equals(other.systemStreamPartitions))
+ }
+ if (!systemStreamPartitionsToOffsets.equals(taskModel.systemStreamPartitionsToOffsets)) {
return false;
- if (taskName == null) {
- if (other.taskName != null)
- return false;
- } else if (!taskName.equals(other.taskName))
+ }
+ if (!taskName.equals(taskModel.taskName)) {
return false;
+ }
+
return true;
}
+ @Override
+ public int hashCode() {
+ int result = taskName.hashCode();
+ result = 31 * result + systemStreamPartitionsToOffsets.hashCode();
+ result = 31 * result + changelogPartition.hashCode();
+ return result;
+ }
+
+ @Override
+
+ public String toString() {
+ return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitionsToOffsets.keySet() + ", changeLogPartition=" + changelogPartition + "]";
+ }
+
public int compareTo(TaskModel other) {
return taskName.compareTo(other.getTaskName());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 7dc431c..172358a 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -19,7 +19,7 @@
package org.apache.samza.serializers.model;
-import java.util.Set;
+import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
@@ -31,14 +31,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
*/
public abstract class JsonTaskModelMixIn {
@JsonCreator
- public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
+ public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions-offsets") Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, @JsonProperty("changelog-partition") Partition changelogPartition) {
}
@JsonProperty("task-name")
abstract TaskName getTaskName();
- @JsonProperty("system-stream-partitions")
- abstract Set<SystemStreamPartition> getSystemStreamPartitions();
+ @JsonProperty("system-stream-partitions-offsets")
+ abstract Map<SystemStreamPartition, String> getCheckpointedOffsets();
@JsonProperty("changelog-partition")
abstract Partition getChangelogPartition();
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 3517912..17410c5 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -29,7 +29,9 @@ import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
@@ -39,9 +41,11 @@ import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.DeserializationContext;
import org.codehaus.jackson.map.JsonDeserializer;
import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.KeyDeserializer;
import org.codehaus.jackson.map.MapperConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.SerializerProvider;
import org.codehaus.jackson.map.introspect.AnnotatedField;
import org.codehaus.jackson.map.introspect.AnnotatedMethod;
@@ -76,9 +80,11 @@ public class SamzaObjectMapper {
// Setup custom serdes for simple data types.
module.addSerializer(Partition.class, new PartitionSerializer());
module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
+ module.addKeySerializer(SystemStreamPartition.class, new SystemStreamPartitionKeySerializer());
module.addSerializer(TaskName.class, new TaskNameSerializer());
module.addDeserializer(Partition.class, new PartitionDeserializer());
module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
+ module.addKeyDeserializer(SystemStreamPartition.class, new SystemStreamPartitionKeyDeserializer());
module.addDeserializer(Config.class, new ConfigDeserializer());
// Setup mixins for data models.
@@ -88,6 +94,7 @@ public class SamzaObjectMapper {
mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false);
// Convert camel case to hyphenated field names, and register the module.
mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
@@ -138,6 +145,23 @@ public class SamzaObjectMapper {
}
}
+ public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
+ @Override
+ public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider)
+ throws IOException {
+ String ssp = Util.sspToString(systemStreamPartition);
+ jgen.writeFieldName(ssp);
+ }
+ }
+
+ public static class SystemStreamPartitionKeyDeserializer extends KeyDeserializer {
+ @Override
+ public Object deserializeKey(String sspString, DeserializationContext ctxt)
+ throws IOException {
+ return Util.stringToSsp(sspString);
+ }
+ }
+
public static class SystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> {
@Override
public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
new file mode 100644
index 0000000..fff7634
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The Changelog manager is used to persist and read the changelog information from the coordinator stream.
+ */
+public class ChangelogPartitionManager {
+
+ private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
+ private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+ private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+ private boolean isCoordinatorConsumerRegistered = false;
+ private String source;
+
+ public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+ this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+ this.coordinatorStreamProducer = coordinatorStreamProducer;
+ this.source = "Unknown";
+ }
+
+ public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+ String source) {
+ this(coordinatorStreamProducer, coordinatorStreamConsumer);
+ this.source = source;
+ }
+
+ public void start() {
+ coordinatorStreamProducer.start();
+ coordinatorStreamConsumer.start();
+ }
+
+ public void stop() {
+ coordinatorStreamConsumer.stop();
+ coordinatorStreamProducer.stop();
+ }
+
+ /**
+ * Registers this manager to write changelog mapping for a particular task.
+ * @param taskName The taskname to be registered for changelog mapping.
+ */
+ public void register(TaskName taskName) {
+ log.debug("Adding taskName {} to {}", taskName, this);
+ if(!isCoordinatorConsumerRegistered) {
+ coordinatorStreamConsumer.register();
+ isCoordinatorConsumerRegistered = true;
+ }
+ coordinatorStreamProducer.register(taskName.getTaskName());
+ }
+
+ /**
+ * Read the taskName to partition mapping that is being maintained by this ChangelogManager
+ * @return TaskName to change log partition mapping, or an empty map if there were no messages.
+ */
+ public Map<TaskName, Integer> readChangeLogPartitionMapping() {
+ log.debug("Reading changelog partition information");
+ Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetChangelogMapping.TYPE);
+ HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
+ for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+ SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
+ changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
+ log.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
+ }
+ return changelogMapping;
+ }
+
+ /**
+ * Write the taskName to partition mapping that is being maintained by this ChangelogManager
+ * @param changelogEntries The entries that needs to be written to the coordinator stream, the map takes the taskName
+ * and it's corresponding changelog partition.
+ */
+ public void writeChangeLogPartitionMapping(Map<TaskName, Integer> changelogEntries) {
+ log.debug("Updating changelog information with: ");
+ for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
+ log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
+ SetChangelogMapping changelogMapping = new SetChangelogMapping(source,
+ entry.getKey().getTaskName(),
+ entry.getValue());
+ coordinatorStreamProducer.send(changelogMapping);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index ddc30af..2e3aeb8 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -26,14 +26,18 @@ import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config.{Config, StreamConfig}
import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, Util}
+import org.apache.samza.util.CommandLine
import org.apache.samza.{Partition, SamzaException}
import scala.collection.JavaConversions._
import org.apache.samza.util.Logging
import org.apache.samza.coordinator.JobCoordinator
+import scala.collection.immutable.HashMap
+
+
/**
* Command-line tool for inspecting and manipulating the checkpoints for a job.
* This can be used, for example, to force a job to re-process a stream from the
@@ -113,31 +117,30 @@ object CheckpointTool {
}
}
+ def apply(config: Config, offsets: TaskNameToCheckpointMap) = {
+ val factory = new CoordinatorStreamSystemFactory
+ val coordinatorStreamConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap())
+ val coordinatorStreamProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap())
+ val manager = new CheckpointManager(coordinatorStreamProducer, coordinatorStreamConsumer, "checkpoint-tool")
+ new CheckpointTool(config, offsets, manager)
+ }
+
def main(args: Array[String]) {
val cmdline = new CheckpointToolCommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
- val tool = new CheckpointTool(config, cmdline.newOffsets)
+ val tool = CheckpointTool(config, cmdline.newOffsets)
tool.run
}
}
-class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extends Logging {
- val manager = config.getCheckpointManagerFactory match {
- case Some(className) =>
- Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
- case _ =>
- throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
- }
-
- // The CheckpointManagerFactory needs to perform this same operation when initializing
- // the manager. TODO figure out some way of avoiding duplicated work.
+class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manager: CheckpointManager) extends Logging {
def run {
info("Using %s" format manager)
// Find all the TaskNames that would be generated for this job config
- val coordinator = JobCoordinator(config, 1)
+ val coordinator = JobCoordinator(config)
val taskNames = coordinator
.jobModel
.getContainers
@@ -165,7 +168,10 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extend
/** Load the most recent checkpoint state for all a specified TaskName. */
def readLastCheckpoint(taskName:TaskName): Map[SystemStreamPartition, String] = {
- manager.readLastCheckpoint(taskName).getOffsets.toMap
+ Option(manager.readLastCheckpoint(taskName))
+ .getOrElse(new Checkpoint(new HashMap[SystemStreamPartition, String]()))
+ .getOffsets
+ .toMap
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 85c1749..20e5d26 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,8 @@ object OffsetManager extends Logging {
config: Config,
checkpointManager: CheckpointManager = null,
systemAdmins: Map[String, SystemAdmin] = Map(),
- offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics) = {
-
+ offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+ latestOffsets: Map[SystemStreamPartition, String] = Map()) = {
debug("Building offset manager for %s." format systemStreamMetadata)
val offsetSettings = systemStreamMetadata
@@ -99,7 +99,7 @@ object OffsetManager extends Logging {
// Build OffsetSetting so we can create a map for OffsetManager.
(systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
}.toMap
- new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
+ new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics, latestOffsets)
}
}
@@ -142,12 +142,19 @@ class OffsetManager(
/**
* offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
*/
- val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics ) extends Logging {
+ val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+
+ /*
+ * The previously read checkpoints restored from the coordinator stream
+ */
+ val previousCheckpointedOffsets: Map[SystemStreamPartition, String] = Map()
+ ) extends Logging {
/**
* Last offsets processed for each SystemStreamPartition.
*/
- var lastProcessedOffsets = Map[SystemStreamPartition, String]()
+ // Filter out null offset values, we can't use them, these exist only because of SSP information
+ var lastProcessedOffsets = previousCheckpointedOffsets.filter(_._2 != null)
/**
* Offsets to start reading from for each SystemStreamPartition. This
@@ -170,7 +177,8 @@ class OffsetManager(
def start {
registerCheckpointManager
- loadOffsetsFromCheckpointManager
+ initializeCheckpointManager
+ loadOffsets
stripResetStreams
loadStartingOffsets
loadDefaults
@@ -240,18 +248,21 @@ class OffsetManager(
}
}
+ private def initializeCheckpointManager {
+ if (checkpointManager != null) {
+ checkpointManager.start
+ } else {
+ debug("Skipping offset load from checkpoint manager because no manager was defined.")
+ }
+ }
+
/**
* Loads last processed offsets from checkpoint manager for all registered
* partitions.
*/
- private def loadOffsetsFromCheckpointManager {
- if (checkpointManager != null) {
- debug("Loading offsets from checkpoint manager.")
-
- checkpointManager.start
-
- lastProcessedOffsets ++= systemStreamPartitions.keys
- .flatMap(restoreOffsetsFromCheckpoint(_)).filter {
+ private def loadOffsets {
+ debug("Loading offsets")
+ lastProcessedOffsets.filter {
case (systemStreamPartition, offset) =>
val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
if (!shouldKeep) {
@@ -260,26 +271,7 @@ class OffsetManager(
info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
shouldKeep
}
- } else {
- debug("Skipping offset load from checkpoint manager because no manager was defined.")
- }
- }
-
- /**
- * Loads last processed offsets for a single taskName.
- */
- private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[SystemStreamPartition, String] = {
- debug("Loading checkpoints for taskName: %s." format taskName)
-
- val checkpoint = checkpointManager.readLastCheckpoint(taskName)
- if (checkpoint != null) {
- checkpoint.getOffsets.toMap
- } else {
- info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-
- Map()
- }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
deleted file mode 100644
index 2a87a6e..0000000
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.file
-
-import java.io.File
-import java.io.FileNotFoundException
-import java.io.FileOutputStream
-import java.util
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.serializers.CheckpointSerde
-import scala.io.Source
-
-class FileSystemCheckpointManager(
- jobName: String,
- root: File,
- serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager {
-
- override def register(taskName: TaskName):Unit = Unit
-
- def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
-
- def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
- val bytes = serde.toBytes(checkpoint)
- val fos = new FileOutputStream(getCheckpointFile(taskName))
-
- fos.write(bytes)
- fos.close
- }
-
- def readLastCheckpoint(taskName: TaskName): Checkpoint = {
- try {
- val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray
-
- serde.fromBytes(bytes)
- } catch {
- case e: FileNotFoundException => null
- }
- }
-
- def start {
- if (!root.exists) {
- throw new SamzaException("Root directory for file system checkpoint manager does not exist: %s" format root)
- }
- }
-
- def stop {}
-
- private def getFile(jobName: String, taskName: TaskName, fileType:String) =
- new File(root, "%s-%s-%s" format (jobName, taskName, fileType))
-
- private def getChangeLogPartitionMappingFile() = getFile(jobName, new TaskName("partition-mapping"), "changelog-partition-mapping")
-
- override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
- try {
- val bytes = Source.fromFile(getChangeLogPartitionMappingFile()).map(_.toByte).toArray
- serde.changelogPartitionMappingFromBytes(bytes)
- } catch {
- case e: FileNotFoundException => new util.HashMap[TaskName, java.lang.Integer]()
- }
- }
-
- def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = {
- val hashmap = new util.HashMap[TaskName, java.lang.Integer](mapping)
- val bytes = serde.changelogPartitionMappingToBytes(hashmap)
- val fos = new FileOutputStream(getChangeLogPartitionMappingFile())
-
- fos.write(bytes)
- fos.close
- }
-}
-
-class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
- def getCheckpointManager(config: Config, registry: MetricsRegistry) = {
- val name = config
- .getName
- .getOrElse(throw new SamzaException("Missing job name in configs"))
- val root = config
- .getFileSystemCheckpointRoot
- .getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
- new FileSystemCheckpointManager(name, new File(root))
- }
-}