You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/13 22:09:43 UTC
[1/2] git commit: SAMZA-182;
use HADOOP_YARN_HOME instead of YARN_HOME in run-classh.sh.
Repository: incubator-samza
Updated Branches:
refs/heads/master c932c5029 -> 3a6f2555a
SAMZA-182; use HADOOP_YARN_HOME instead of YARN_HOME in run-classh.sh.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/782a0923
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/782a0923
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/782a0923
Branch: refs/heads/master
Commit: 782a0923652b712af84d81dc6688a3880ab040b7
Parents: c932c50
Author: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Authored: Thu Mar 13 14:06:14 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Mar 13 14:06:14 2014 -0700
----------------------------------------------------------------------
samza-shell/src/main/bash/run-class.sh | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/782a0923/samza-shell/src/main/bash/run-class.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 2fa2acf..5aa34cd 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -33,8 +33,9 @@ if [ ! -d "$base_dir/lib" ]; then
exit 1
fi
-YARN_HOME="${YARN_HOME:-$HOME/.samza}"
-CLASSPATH=$YARN_HOME/conf
+HADOOP_YARN_HOME="${HADOOP_YARN_HOME:-$HOME/.samza}"
+HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_YARN_HOME/conf}"
+CLASSPATH=$HADOOP_CONF_DIR
for file in $base_dir/lib/*.[jw]ar;
do
[2/2] git commit: SAMZA-157;
add samza.offset.default configuration for streams and systems.
Posted by cr...@apache.org.
SAMZA-157; add samza.offset.default configuration for streams and systems.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3a6f2555
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3a6f2555
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3a6f2555
Branch: refs/heads/master
Commit: 3a6f2555aa92d47dd85b54f3f1503719c08d31a7
Parents: 782a092
Author: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Authored: Thu Mar 13 14:08:46 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Mar 13 14:08:46 2014 -0700
----------------------------------------------------------------------
.../org/apache/samza/system/SystemAdmin.java | 14 +-
.../samza/system/SystemStreamMetadata.java | 50 +++
...inglePartitionWithoutOffsetsSystemAdmin.java | 12 +
.../apache/samza/checkpoint/OffsetManager.scala | 351 +++++++++++++++++++
.../org/apache/samza/config/StreamConfig.scala | 4 +
.../org/apache/samza/config/SystemConfig.scala | 3 +
.../apache/samza/container/SamzaContainer.scala | 47 +--
.../apache/samza/container/TaskInstance.scala | 72 ++--
.../system/chooser/BootstrappingChooser.scala | 19 +-
.../samza/checkpoint/TestOffsetManager.scala | 188 ++++++++++
.../samza/container/TestTaskInstance.scala | 28 +-
.../samza/system/kafka/KafkaSystemAdmin.scala | 16 +-
.../system/kafka/TestKafkaSystemAdmin.scala | 19 +-
.../samza/system/mock/MockSystemAdmin.java | 12 +
.../test/integration/TestStatefulTask.scala | 20 +-
15 files changed, 737 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 437bfb2..3976253 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -29,9 +29,21 @@ import java.util.Set;
* system.
*/
public interface SystemAdmin {
+
+ /**
+ * Fetches the offsets for the messages immediately after the supplied offsets
+ * for a group of SystemStreamPartitions.
+ *
+ * @param offsets
+ * Map from SystemStreamPartition to current offsets.
+ * @return Map from SystemStreamPartition to offsets immediately after the
+ * current offsets.
+ */
+ Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets);
+
/**
* Fetch metadata from a system for a set of streams.
- *
+ *
* @param streamNames
* The streams to to fetch metadata for.
* @return A map from stream name to SystemStreamMetadata for each stream
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
index 32e142a..817c557 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
@@ -21,7 +21,9 @@ package org.apache.samza.system;
import java.util.Collections;
import java.util.Map;
+
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
/**
* SystemAdmins use this class to return useful metadata about a stream's offset
@@ -139,6 +141,23 @@ public class SystemStreamMetadata {
return upcomingOffset;
}
+ /**
+ * @param offsetType
+ * The type of offset to get. Either oldest, newest, or upcoming.
+ * @return The corresponding offset for the offset type requested.
+ */
+ public String getOffset(OffsetType offsetType) {
+ if (offsetType.equals(OffsetType.OLDEST)) {
+ return getOldestOffset();
+ } else if (offsetType.equals(OffsetType.NEWEST)) {
+ return getNewestOffset();
+ } else if (offsetType.equals(OffsetType.UPCOMING)) {
+ return getUpcomingOffset();
+ } else {
+ throw new SamzaException("Invalid offset type defined " + offsetType + ".");
+ }
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -181,4 +200,35 @@ public class SystemStreamMetadata {
return "SystemStreamPartitionMetadata [oldestOffset=" + oldestOffset + ", newestOffset=" + newestOffset + ", upcomingOffset=" + upcomingOffset + "]";
}
}
+
+ /**
+ * OffsetType is an enum used to define which offset should be used when
+ * reading from a SystemStreamPartition for the first time.
+ */
+ public enum OffsetType {
+
+ /**
+ * Signals the offset of the oldest message in a SystemStreamPartition.
+ */
+ OLDEST("oldest"),
+
+ /**
+ * Signals the offset of the newest message in a SystemStreamPartition.
+ */
+ NEWEST("newest"),
+
+ /**
+ * Signals the offset of the next message to be written into a
+ * SystemStreamPartition. If the offset of the most recent message written
+ * to a SystemStreamPartition is 7, then upcoming would signal offset 8
+ * (assuming the offsets were incremental).
+ */
+ UPCOMING("upcoming");
+
+ private final String offsetType;
+
+ private OffsetType(String offsetType) {
+ this.offsetType = offsetType;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 44fd82a..38e313f 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -26,6 +26,7 @@ import org.apache.samza.Partition;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
/**
* A simple helper admin class that defines a single partition (partition 0) for
@@ -51,4 +52,15 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
return metadata;
}
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
+
+ for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+ offsetsAfter.put(systemStreamPartition, null);
+ }
+
+ return offsetsAfter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
new file mode 100644
index 0000000..80341c8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import org.apache.samza.system.SystemStream
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamMetadata.OffsetType
+import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+import org.apache.samza.config.Config
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.system.SystemAdmin
+
+/**
+ * OffsetSetting encapsulates a SystemStream's metadata, default offset, and
+ * reset offset settings. It's just a convenience class to make OffsetManager
+ * easier to work with.
+ */
+case class OffsetSetting(
+ /**
+ * The metadata for the SystemStream.
+ */
+ metadata: SystemStreamMetadata,
+
+ /**
+ * The default offset (oldest, newest, or upcoming) for the SystemStream.
+ * This setting is used when no checkpoint is available for a SystemStream
+ * if the job is starting for the first time, or the SystemStream has been
+ * reset (see resetOffsets, below).
+ */
+ defaultOffset: OffsetType,
+
+ /**
+ * Whether the SystemStream's offset should be reset or not. Determines
+ * whether an offset should be ignored at initialization time, even if a
+ * checkpoint is available. This is useful for jobs that wish to restart
+ * reading from a stream at a different position than where they last
+ * checkpointed. If this is true, then defaultOffset will be used to find
+ * the new starting position in the stream.
+ */
+ resetOffset: Boolean)
+
+/**
+ * OffsetManager object is a helper that does wiring to build an OffsetManager
+ * from a config object.
+ */
+object OffsetManager extends Logging {
+ def apply(
+ systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
+ config: Config,
+ checkpointManager: CheckpointManager = null,
+ systemAdmins: Map[String, SystemAdmin] = Map()) = {
+
+ debug("Building offset manager for %s." format systemStreamMetadata)
+
+ val offsetSettings = systemStreamMetadata
+ .map {
+ case (systemStream, systemStreamMetadata) =>
+ // Get default offset.
+ val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
+ val systemDefaultOffset = config.getDefaultSystemOffset(systemStream.getSystem)
+ val defaultOffsetType = if (streamDefaultOffset.isDefined) {
+ OffsetType.valueOf(streamDefaultOffset.get.toUpperCase)
+ } else if (systemDefaultOffset.isDefined) {
+ OffsetType.valueOf(systemDefaultOffset.get.toUpperCase)
+ } else {
+ info("No default offset for %s defined. Using newest." format systemStream)
+ OffsetType.UPCOMING
+ }
+ debug("Using default offset %s for %s." format (defaultOffsetType, systemStream))
+
+ // Get reset offset.
+ val resetOffset = config.getResetOffset(systemStream)
+ debug("Using reset offset %s for %s." format (resetOffset, systemStream))
+
+ // Build OffsetSetting so we can create a map for OffsetManager.
+ (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
+ }.toMap
+ new OffsetManager(offsetSettings, checkpointManager, systemAdmins)
+ }
+}
+
+/**
+ * OffsetManager does several things:
+ *
+ * <ul>
+ * <li>Loads last checkpointed offset for all input SystemStreamPartitions in a
+ * SamzaContainer.</li>
+ * <li>Uses last checkpointed offset to figure out the next offset to start
+ * reading from for each input SystemStreamPartition in a SamzaContainer</li>
+ * <li>Keep track of the last processed offset for each SystemStreamPartitions
+ * in a SamzaContainer.</li>
+ * <li>Checkpoints the last processed offset for each SystemStreamPartitions
+ * in a SamzaContainer periodically to the CheckpointManager.</li>
+ * </ul>
+ *
+ * All partitions must be registered before start is called, and start must be
+ * called before get/update/checkpoint/stop are called.
+ */
+class OffsetManager(
+
+ /**
+ * Offset settings for all streams that the OffsetManager is managing.
+ */
+ val offsetSettings: Map[SystemStream, OffsetSetting] = Map(),
+
+ /**
+ * Optional checkpoint manager for checkpointing offsets whenever
+ * checkpoint is called.
+ */
+ val checkpointManager: CheckpointManager = null,
+
+ /**
+ * SystemAdmins that are used to get next offsets from last checkpointed
+ * offsets. Map is from system name to SystemAdmin class for the system.
+ */
+ val systemAdmins: Map[String, SystemAdmin] = Map()) extends Logging {
+
+ /**
+ * Last offsets processed for each SystemStreamPartition.
+ */
+ var lastProcessedOffsets = Map[SystemStreamPartition, String]()
+
+ /**
+ * Offsets to start reading from for each SystemStreamPartition. This
+ * variable is populated after all checkpoints have been restored.
+ */
+ var startingOffsets = Map[SystemStreamPartition, String]()
+
+ /**
+ * The set of system stream partitions that have been registered with the
+ * OffsetManager. These are the SSPs that will be tracked within the offset
+ * manager.
+ */
+ var systemStreamPartitions = Set[SystemStreamPartition]()
+
+ def register(systemStreamPartition: SystemStreamPartition) {
+ systemStreamPartitions += systemStreamPartition
+ }
+
+ def start {
+ registerCheckpointManager
+ loadOffsetsFromCheckpointManager
+ stripResetStreams
+ loadStartingOffsets
+ loadDefaults
+
+ info("Successfully loaded last processed offsets: %s" format lastProcessedOffsets)
+ info("Successfully loaded starting offsets: %s" format startingOffsets)
+ }
+
+ /**
+ * Set the last processed offset for a given SystemStreamPartition.
+ */
+ def update(systemStreamPartition: SystemStreamPartition, offset: String) {
+ lastProcessedOffsets += systemStreamPartition -> offset
+ }
+
+ /**
+ * Get the last processed offset for a SystemStreamPartition.
+ */
+ def getLastProcessedOffset(systemStreamPartition: SystemStreamPartition) = {
+ lastProcessedOffsets.get(systemStreamPartition)
+ }
+
+ /**
+ * Get the starting offset for a SystemStreamPartition. This is the offset
+ * where a SamzaContainer begins reading from when it starts up.
+ */
+ def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
+ startingOffsets.get(systemStreamPartition)
+ }
+
+ /**
+ * Checkpoint all offsets for a given partition using the CheckpointManager.
+ */
+ def checkpoint(partition: Partition) {
+ if (checkpointManager != null) {
+ debug("Checkpointing offsets for partition %s." format partition)
+
+ val partitionOffsets = lastProcessedOffsets
+ .filterKeys(_.getPartition.equals(partition))
+ .map { case (systemStreamPartition, offset) => (systemStreamPartition.getSystemStream, offset) }
+ .toMap
+
+ checkpointManager.writeCheckpoint(partition, new Checkpoint(partitionOffsets))
+ } else {
+ debug("Skipping checkpointing for partition %s because no checkpoint manager is defined." format partition)
+ }
+ }
+
+ def stop {
+ if (checkpointManager != null) {
+ debug("Shutting down checkpoint manager.")
+
+ checkpointManager.stop
+ } else {
+ debug("Skipping checkpoint manager shutdown because no checkpoint manager is defined.")
+ }
+ }
+
+ /**
+ * Returns a set of partitions that have been registered with this offset
+ * manager.
+ */
+ private def getPartitions = {
+ systemStreamPartitions
+ .map(_.getPartition)
+ .toSet
+ }
+
+ /**
+ * Register all partitions with the CheckpointManager.
+ */
+ private def registerCheckpointManager {
+ if (checkpointManager != null) {
+ debug("Registering checkpoint manager.")
+
+ getPartitions.foreach(checkpointManager.register)
+ } else {
+ debug("Skipping checkpoint manager registration because no manager was defined.")
+ }
+ }
+
+ /**
+ * Loads last processed offsets from checkpoint manager for all registered
+ * partitions.
+ */
+ private def loadOffsetsFromCheckpointManager {
+ if (checkpointManager != null) {
+ debug("Loading offsets from checkpoint manager.")
+
+ checkpointManager.start
+
+ lastProcessedOffsets ++= getPartitions.flatMap(restoreOffsetsFromCheckpoint(_))
+ } else {
+ debug("Skipping offset load from checkpoint manager because no manager was defined.")
+ }
+ }
+
+ /**
+ * Loads last processed offsets for a single partition.
+ */
+ private def restoreOffsetsFromCheckpoint(partition: Partition): Map[SystemStreamPartition, String] = {
+ debug("Loading checkpoints for partition: %s." format partition)
+
+ checkpointManager
+ .readLastCheckpoint(partition)
+ .getOffsets
+ .map { case (systemStream, offset) => (new SystemStreamPartition(systemStream, partition), offset) }
+ .toMap
+ }
+
+ /**
+ * Removes offset settings for all SystemStreams that are to be forcibly
+ * reset using resetOffsets.
+ */
+ private def stripResetStreams {
+ val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets.keys)
+
+ systemStreamPartitionsToReset.foreach(systemStreamPartition => {
+ val offset = lastProcessedOffsets(systemStreamPartition)
+ info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
+ })
+
+ lastProcessedOffsets --= systemStreamPartitionsToReset
+ }
+
+ /**
+ * Returns a set of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
+ */
+ private def getSystemStreamPartitionsToReset(systemStreamPartitions: Iterable[SystemStreamPartition]): Set[SystemStreamPartition] = {
+ systemStreamPartitions
+ .filter(systemStreamPartition => {
+ val systemStream = systemStreamPartition.getSystemStream
+ offsetSettings
+ .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
+ .resetOffset
+ }).toSet
+ }
+
+ /**
+ * Use last processed offsets to get next available offset for each
+ * SystemStreamPartition, and populate startingOffsets.
+ */
+ private def loadStartingOffsets {
+ startingOffsets ++= lastProcessedOffsets
+ // Group offset map according to systemName.
+ .groupBy(_._1.getSystem)
+ // Get next offsets for each system.
+ .flatMap {
+ case (systemName, systemStreamPartitionOffsets) =>
+ systemAdmins
+ .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
+ .getOffsetsAfter(systemStreamPartitionOffsets)
+ }
+ }
+
+ /**
+ * Use defaultOffsets to get a next offset for every SystemStreamPartition
+ * that was registered, but has no offset.
+ */
+ private def loadDefaults {
+ systemStreamPartitions.foreach(systemStreamPartition => {
+ if (!startingOffsets.contains(systemStreamPartition)) {
+ val systemStream = systemStreamPartition.getSystemStream
+ val partition = systemStreamPartition.getPartition
+ val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
+ val systemStreamMetadata = offsetSetting.metadata
+ val offsetType = offsetSetting.defaultOffset
+
+ debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
+
+ val systemStreamPartitionMetadata = systemStreamMetadata
+ .getSystemStreamPartitionMetadata
+ .get(partition)
+
+ if (systemStreamPartitionMetadata != null) {
+ val nextOffset = systemStreamPartitionMetadata.getOffset(offsetType)
+
+ debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
+
+ startingOffsets += systemStreamPartition -> nextOffset
+ } else {
+ throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
+ }
+ }
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 517e9ae..d71ead1 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -31,6 +31,7 @@ object StreamConfig {
val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
+ val CONSUMER_OFFSET_DEFAULT = STREAM_PREFIX + "samza.offset.default"
implicit def Config2Stream(config: Config) = new StreamConfig(config)
}
@@ -64,6 +65,9 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case _ => false
}
+ def getDefaultStreamOffset(systemStream: SystemStream) =
+ getOption(StreamConfig.CONSUMER_OFFSET_DEFAULT format (systemStream.getSystem, systemStream.getStream))
+
/**
* Returns a list of all SystemStreams that have a serde defined from the config file.
*/
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index ce63a8a..5bb17c7 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -29,6 +29,7 @@ object SystemConfig {
val SYSTEM_FACTORY = "systems.%s.samza.factory"
val KEY_SERDE = "systems.%s.samza.key.serde"
val MSG_SERDE = "systems.%s.samza.msg.serde"
+ val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
implicit def Config2System(config: Config) = new SystemConfig(config)
}
@@ -40,6 +41,8 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE format name)
+ def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
+
/**
* Returns a list of all system names from the config file. Useful for
* getting individual systems.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 9c23244..c101b59 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -62,6 +62,7 @@ import org.apache.samza.system.chooser.RoundRobinChooserFactory
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.checkpoint.OffsetManager
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -100,12 +101,6 @@ object SamzaContainer extends Logging {
info("Got system names: %s" format systemNames)
- val resetInputStreams = systemNames.flatMap(systemName => {
- config.getResetOffsetMap(systemName)
- }).toMap
-
- info("Got input stream resets: %s" format resetInputStreams)
-
val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_))
debug("Got serde streams: %s" format serdeStreams)
@@ -279,6 +274,10 @@ object SamzaContainer extends Logging {
info("Got checkpoint manager: %s" format checkpointManager)
+ val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins)
+
+ info("Got offset manager: %s" format offsetManager)
+
val consumerMultiplexer = new SystemConsumers(
// TODO add config values for no new message timeout and max msgs per stream partition
chooser = chooser,
@@ -420,12 +419,11 @@ object SamzaContainer extends Logging {
metrics = taskInstanceMetrics,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
+ offsetManager = offsetManager,
storageManager = storageManager,
- checkpointManager = checkpointManager,
reporters = reporters,
listeners = listeners,
inputStreams = inputStreamsForThisPartition,
- resetInputStreams = resetInputStreams,
windowMs = taskWindowMs,
commitMs = taskCommitMs,
collector = collector)
@@ -440,8 +438,8 @@ object SamzaContainer extends Logging {
config = config,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
+ offsetManager = offsetManager,
metrics = samzaContainerMetrics,
- checkpointManager = checkpointManager,
reporters = reporters,
jvm = jvm)
}
@@ -482,7 +480,7 @@ class SamzaContainer(
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers,
metrics: SamzaContainerMetrics,
- checkpointManager: CheckpointManager = null,
+ offsetManager: OffsetManager = new OffsetManager,
reporters: Map[String, MetricsReporter] = Map(),
jvm: JvmMetrics = null) extends Runnable with Logging {
@@ -491,7 +489,7 @@ class SamzaContainer(
info("Starting container.")
startMetrics
- startCheckpoints
+ startOffsetManager
startStores
startTask
startProducers
@@ -524,7 +522,7 @@ class SamzaContainer(
shutdownProducers
shutdownTask
shutdownStores
- shutdownCheckpoints
+ shutdownOffsetManager
shutdownMetrics
info("Shutdown complete.")
@@ -550,18 +548,14 @@ class SamzaContainer(
})
}
- def startCheckpoints {
- info("Registering task instances with checkpoints.")
+ def startOffsetManager {
+ info("Registering task instances with offsets.")
- taskInstances.values.foreach(_.registerCheckpoints)
+ taskInstances.values.foreach(_.registerOffsets)
- if (checkpointManager != null) {
- info("Registering checkpoint manager.")
+ info("Starting offset manager.")
- checkpointManager.start
- } else {
- warn("No checkpoint manager defined. No consumer offsets will be maintained for this job.")
- }
+ offsetManager.start
}
def startStores {
@@ -666,13 +660,10 @@ class SamzaContainer(
taskInstances.values.foreach(_.shutdownStores)
}
- def shutdownCheckpoints {
- if (checkpointManager != null) {
- info("Shutting down checkpoint manager.")
- checkpointManager.stop
- } else {
- info("No checkpoint manager defined, so skipping checkpoint manager stop.")
- }
+ def shutdownOffsetManager {
+ info("Shutting down offset manager.")
+
+ offsetManager.stop
}
def shutdownMetrics {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 5127595..c4b135c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -1,5 +1,4 @@
/*
-
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -43,6 +42,8 @@ import org.apache.samza.system.SystemConsumers
import org.apache.samza.system.SystemProducers
import org.apache.samza.task.ReadableCoordinator
import org.apache.samza.metrics.Gauge
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.SamzaException
class TaskInstance(
task: StreamTask,
@@ -51,18 +52,16 @@ class TaskInstance(
metrics: TaskInstanceMetrics,
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers,
+ offsetManager: OffsetManager = new OffsetManager,
storageManager: TaskStorageManager = null,
- checkpointManager: CheckpointManager = null,
reporters: Map[String, MetricsReporter] = Map(),
listeners: Seq[TaskLifecycleListener] = Seq(),
inputStreams: Set[SystemStream] = Set(),
- resetInputStreams: Map[SystemStream, Boolean] = Map(),
windowMs: Long = -1,
commitMs: Long = 60000,
clock: () => Long = { System.currentTimeMillis },
collector: ReadableCollector = new ReadableCollector) extends Logging {
- var offsets = Map[SystemStream, String]()
var lastWindowMs = 0L
var lastCommitMs = 0L
val isInitableTask = task.isInstanceOf[InitableTask]
@@ -86,14 +85,12 @@ class TaskInstance(
reporters.values.foreach(_.register(metrics.source, metrics.registry))
}
- def registerCheckpoints {
- if (checkpointManager != null) {
- debug("Registering checkpoint manager for partition: %s." format partition)
+ def registerOffsets {
+ debug("Registering offsets for partition: %s." format partition)
- checkpointManager.register(partition)
- } else {
- debug("Skipping checkpoint manager registration for partition: %s." format partition)
- }
+ inputStreams.foreach(systemStream => {
+ offsetManager.register(new SystemStreamPartition(systemStream, partition))
+ })
}
def startStores {
@@ -127,34 +124,19 @@ class TaskInstance(
}
def registerConsumers {
- if (checkpointManager != null) {
- debug("Loading checkpoints for partition: %s." format partition)
-
- val checkpoint = checkpointManager.readLastCheckpoint(partition)
-
- if (checkpoint != null) {
- for ((systemStream, offset) <- checkpoint.getOffsets) {
- if (!resetInputStreams.getOrElse(systemStream, false)) {
- offsets += systemStream -> offset
-
- metrics.addOffsetGauge(systemStream, () => offsets(systemStream))
- } else {
- info("Got offset %s for %s and partition %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream, partition))
- }
- }
-
- info("Successfully loaded offsets for partition: %s, %s" format (partition, offsets))
- } else {
- warn("No checkpoint found for partition: %s. This is allowed if this is your first time running the job, but if it's not, you've probably lost data." format partition)
- }
- }
-
debug("Registering consumers for partition: %s." format partition)
- inputStreams.foreach(stream =>
- consumerMultiplexer.register(
- new SystemStreamPartition(stream, partition),
- offsets.get(stream).getOrElse(null)))
+ inputStreams.foreach(systemStream => {
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val offset = offsetManager.getStartingOffset(systemStreamPartition)
+ .getOrElse(throw new SamzaException("No offset defined for partition %s: %s" format (partition, systemStream)))
+ consumerMultiplexer.register(systemStreamPartition, offset)
+ metrics.addOffsetGauge(systemStream, () => {
+ offsetManager
+ .getLastProcessedOffset(systemStreamPartition)
+ .getOrElse(null)
+ })
+ })
}
def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
@@ -170,7 +152,7 @@ class TaskInstance(
trace("Updating offset map for partition: %s, %s, %s" format (partition, envelope.getSystemStreamPartition, envelope.getOffset))
- offsets += envelope.getSystemStreamPartition.getSystemStream -> envelope.getOffset
+ offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
}
def window(coordinator: ReadableCoordinator) {
@@ -221,11 +203,9 @@ class TaskInstance(
producerMultiplexer.flush(metrics.source)
- if (checkpointManager != null) {
- trace("Committing checkpoint manager for partition: %s" format partition)
+ trace("Committing offset manager for partition: %s" format partition)
- checkpointManager.writeCheckpoint(partition, new Checkpoint(offsets))
- }
+ offsetManager.checkpoint(partition)
lastCommitMs = clock()
} else {
@@ -258,9 +238,9 @@ class TaskInstance(
debug("Skipping storage manager shutdown for partition: %s" format partition)
}
}
-
+
override def toString() = "TaskInstance for class %s and partition %s." format (task.getClass.getName, partition)
-
- def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size)
-
+
+ def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size)
+
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index f7d3c8b..91c1813 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -30,6 +30,7 @@ import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemStreamMetadata
import scala.collection.JavaConversions._
import org.apache.samza.SamzaException
+import org.apache.samza.system.SystemStreamMetadata.OffsetType
/**
* BootstrappingChooser is a composable MessageChooser that only chooses
@@ -114,7 +115,7 @@ class BootstrappingChooser(
// offset for this system stream partition, then we've already read all
// messages in the stream, and we're at head for this system stream
// partition.
- checkOffset(systemStreamPartition, offset, Upcoming)
+ checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
wrapped.register(systemStreamPartition, offset)
}
@@ -165,7 +166,7 @@ class BootstrappingChooser(
// If the offset we just read is the same as the offset for the last
// message (newest) in this system stream partition, then we have read
// all messages, and can mark this SSP as bootstrapped.
- checkOffset(systemStreamPartition, offset, Newest)
+ checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
}
envelope
@@ -207,7 +208,7 @@ class BootstrappingChooser(
* Upcoming is useful during the registration phase,
* and newest is useful during the choosing phase.
*/
- private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, newestOrUpcoming: OffsetType) {
+ private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
val systemStream = systemStreamPartition.getSystemStream
val systemStreamMetadata = bootstrapStreamMetadata.getOrElse(systemStreamPartition.getSystemStream, null)
// Metadata for system/stream, and system/stream/partition are allowed to
@@ -224,15 +225,11 @@ class BootstrappingChooser(
// null. A null partition metadata implies that the stream is not a
// bootstrap stream, and therefore, there is no need to check its offset.
null
- } else if (Newest.equals(newestOrUpcoming)) {
- systemStreamPartitionMetadata.getNewestOffset
- } else if (Upcoming.equals(newestOrUpcoming)) {
- systemStreamPartitionMetadata.getUpcomingOffset
} else {
- throw new SamzaException("Got unknown offset type %s" format newestOrUpcoming)
+ systemStreamPartitionMetadata.getOffset(offsetType)
}
- trace("Check %s offset %s against %s for %s." format (newestOrUpcoming.getClass.getSimpleName, offset, offsetToCheck, systemStreamPartition))
+ trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
// The SSP is no longer lagging if the envelope's offset equals the
// latest offset.
@@ -272,7 +269,3 @@ class BootstrappingChooserMetrics(val registry: MetricsRegistry = new MetricsReg
newGauge("%s-%s-lagging-partitions" format (systemStream.getSystem, systemStream.getStream), getValue)
}
}
-
-private sealed abstract class OffsetType
-private object Upcoming extends OffsetType
-private object Newest extends OffsetType
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
new file mode 100644
index 0000000..5844695
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import scala.collection.JavaConversions._
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamMetadata.OffsetType
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Test
+import org.apache.samza.SamzaException
+import org.apache.samza.util.TestUtil._
+import org.apache.samza.config.MapConfig
+import org.apache.samza.system.SystemAdmin
+
+class TestOffsetManager {
+ @Test
+ def testSystemShouldUseDefaults {
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+ val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest"))
+ val offsetManager = OffsetManager(systemStreamMetadata, config)
+ offsetManager.register(systemStreamPartition)
+ offsetManager.start
+ assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
+ assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
+ assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+ }
+
+ @Test
+ def testShouldLoadFromAndSaveWithCheckpointManager {
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+ val config = new MapConfig
+ val checkpointManager = getCheckpointManager(systemStreamPartition)
+ val systemAdmins = Map("test-system" -> getSystemAdmin)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+ offsetManager.register(systemStreamPartition)
+ offsetManager.start
+ assertTrue(checkpointManager.isStarted)
+ assertEquals(1, checkpointManager.registered.size)
+ assertEquals(partition, checkpointManager.registered.head)
+ assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(partition))
+ // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
+ assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
+ assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+ offsetManager.update(systemStreamPartition, "46")
+ assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+ offsetManager.update(systemStreamPartition, "47")
+ assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+ // Should never update starting offset.
+ assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
+ offsetManager.checkpoint(partition)
+ val expectedCheckpoint = new Checkpoint(Map(systemStream -> "47"))
+ assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(partition))
+ }
+
+ @Test
+ def testShouldResetStreams {
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+ val defaultOffsets = Map(systemStream -> OffsetType.OLDEST)
+ val checkpoint = new Checkpoint(Map(systemStream -> "45"))
+ val checkpointManager = getCheckpointManager(systemStreamPartition)
+ val config = new MapConfig(Map(
+ "systems.test-system.samza.offset.default" -> "oldest",
+ "systems.test-system.streams.test-stream.samza.reset.offset" -> "true"))
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager)
+ offsetManager.register(systemStreamPartition)
+ offsetManager.start
+ assertTrue(checkpointManager.isStarted)
+ assertEquals(1, checkpointManager.registered.size)
+ assertEquals(partition, checkpointManager.registered.head)
+ assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition))
+ // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
+ assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+ }
+
+ @Test
+ def testShouldFailWhenMissingMetadata {
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val offsetManager = new OffsetManager
+ offsetManager.register(systemStreamPartition)
+
+ expect(classOf[SamzaException], Some("Attempting to load defaults for stream SystemStream [system=test-system, stream=test-stream], which has no offset settings.")) {
+ offsetManager.start
+ }
+ }
+
+ @Test
+ def testShouldFailWhenMissingDefault {
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+ val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
+ offsetManager.register(systemStreamPartition)
+
+ expect(classOf[SamzaException], Some("No default offeset defined for SystemStream [system=test-system, stream=test-stream]. Unable to load a default.")) {
+ offsetManager.start
+ }
+ }
+
+ @Test
+ def testDefaultSystemShouldFailWhenFailIsSpecified {
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+ val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail"))
+ expect(classOf[IllegalArgumentException]) {
+ OffsetManager(systemStreamMetadata, config)
+ }
+ }
+
+ @Test
+ def testDefaultStreamShouldFailWhenFailIsSpecified {
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+ val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail"))
+ expect(classOf[IllegalArgumentException]) {
+ OffsetManager(systemStreamMetadata, config)
+ }
+ }
+
+ private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = {
+ val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45"))
+
+ new CheckpointManager {
+ var isStarted = false
+ var isStopped = false
+ var registered = Set[Partition]()
+ var checkpoints = Map(systemStreamPartition.getPartition -> checkpoint)
+ def start { isStarted = true }
+ def register(partition: Partition) { registered += partition }
+ def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { checkpoints += partition -> checkpoint }
+ def readLastCheckpoint(partition: Partition) = checkpoints(partition)
+ def stop { isStopped = true }
+ }
+ }
+
+ private def getSystemAdmin = {
+ new SystemAdmin {
+ def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
+ offsets.mapValues(offset => (offset.toLong + 1).toString)
+
+ def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
+ Map[String, SystemStreamMetadata]()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 5e9dd07..27b4ca5 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -36,6 +36,10 @@ import org.apache.samza.system.SystemConsumer
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import scala.collection.JavaConversions._
class TestTaskInstance {
@Test
@@ -53,20 +57,24 @@ class TestTaskInstance {
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager)
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ // Pretend our last checkpointed (next) offset was 2.
+ val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskInstance: TaskInstance = new TaskInstance(
task,
partition,
config,
new TaskInstanceMetrics,
- consumerMultiplexer: SystemConsumers,
- producerMultiplexer: SystemProducers)
- val systemStream = new SystemStream("test-system", "test-stream")
- // Pretend our last checkpointed offset was 1.
- taskInstance.offsets += systemStream -> "1"
- // Pretend we got a message with offset 2.
- taskInstance.process(new IncomingMessageEnvelope(new SystemStreamPartition("test-system", "test-stream", new Partition(0)), "2", null, null), new ReadableCoordinator)
- // Check to see if the offset map has been properly updated with offset 2.
- assertEquals(1, taskInstance.offsets.size)
- assertEquals("2", taskInstance.offsets(systemStream))
+ consumerMultiplexer,
+ producerMultiplexer,
+ offsetManager)
+ // Pretend we got a message with offset 2 and next offset 3.
+ taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), new ReadableCoordinator)
+ // Check to see if the offset manager has been properly updated with offset 3.
+ val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition)
+ assertTrue(lastProcessedOffset.isDefined)
+ assertEquals("2", lastProcessedOffset.get)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 5325549..dafc980 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -37,6 +37,7 @@ import grizzled.slf4j.Logging
import java.util.UUID
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import kafka.consumer.ConsumerConfig
object KafkaSystemAdmin extends Logging {
/**
@@ -99,7 +100,7 @@ class KafkaSystemAdmin(
* from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
* configuration.
*/
- bufferSize: Int = 1024000,
+ bufferSize: Int = ConsumerConfig.SocketBufferSize,
/**
* The client ID to use for the simple consumer when fetching metadata from
@@ -109,6 +110,17 @@ class KafkaSystemAdmin(
import KafkaSystemAdmin._
+ /**
+ * Returns the offset for the message after the specified offset for each
+ * SystemStreamPartition that was passed in.
+ */
+ def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
+ // This is safe to do with Kafka, even if a topic is key-deduped. If the
+ // offset doesn't exist on a compacted topic, Kafka will return the first
+ // message AFTER the offset that was specified in the fetch request.
+ offsets.mapValues(offset => (offset.toLong + 1).toString)
+ }
+
def getSystemStreamMetadata(streams: java.util.Set[String]) =
getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
@@ -127,7 +139,7 @@ class KafkaSystemAdmin(
var done = false
var consumer: SimpleConsumer = null
- debug("Fetching offsets for: %s" format streams)
+ debug("Fetching system stream metadata for: %s" format streams)
while (!done) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index eaa9e53..cd9d926 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -274,12 +274,23 @@ class TestKafkaSystemAdmin {
val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
- new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0")
- )))
+ new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0"))))
+ }
+
+ @Test
+ def testOffsetsAfter {
+ val systemAdmin = new KafkaSystemAdmin("test", brokers)
+ val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
+ val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
+ ssp1 -> "1",
+ ssp2 -> "2"))
+ assertEquals("2", offsetsAfter(ssp1))
+ assertEquals("3", offsetsAfter(ssp2))
}
class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
- import kafka.api.{TopicMetadata, TopicMetadataResponse}
+ import kafka.api.{ TopicMetadata, TopicMetadataResponse }
// Simulate Kafka telling us that the leader for the topic is not available
override def getTopicMetadata(topics: Set[String]) = {
@@ -292,7 +303,7 @@ class TestKafkaSystemAdmin {
class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy {
var countCalls = 0
-
+
override def sleep() = {
if (countCalls >= maxCalls) throw new CallLimitReached
countCalls += 1
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index 2abe1c8..fa1d51b 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
/**
@@ -51,4 +52,15 @@ public class MockSystemAdmin implements SystemAdmin {
return metadata;
}
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
+
+ for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+ offsetsAfter.put(systemStreamPartition, null);
+ }
+
+ return offsetsAfter;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 7e81387..8177cbf 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -211,24 +211,16 @@ class TestStatefulTask {
"stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
"stores.mystore.key.serde" -> "string",
"stores.mystore.msg.serde" -> "string",
- "stores.mystore.changelog" -> "kafka-state.mystore",
-
- // TODO we don't need two different systems once SAMZA-157 is committed.
- // We will be able to do per-stream offset defaults.
+ "stores.mystore.changelog" -> "kafka.mystore",
// Use smallest reset for input streams, so we can fix SAMZA-166.
"systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
- "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
- "systems.kafka.consumer.auto.offset.reset" -> "smallest",
- "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+ "systems.kafka.samza.offset.default" -> "oldest",
"systems.kafka.samza.msg.serde" -> "string",
-
- // Use largest offset for changelog stream, so we can test SAMZA-142.
- "systems.kafka-state.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
- "systems.kafka-state.consumer.zookeeper.connect" -> zkConnect,
- "systems.kafka-state.consumer.auto.offset.reset" -> "smallest",
- "systems.kafka-state.producer.metadata.broker.list" -> ("localhost:%s" format port1),
- "systems.kafka-state.samza.msg.serde" -> "string")
+ "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
+ // Use largest offset for reset, so we can test SAMZA-142.
+ "systems.kafka.consumer.auto.offset.reset" -> "largest",
+ "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1))
@Test
def testShouldStartAndRestore {