You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/14 01:11:32 UTC
git commit: Seeing issues when running with SAMZA-157, so reverting.
Repository: incubator-samza
Updated Branches:
refs/heads/master 3a6f2555a -> 2811c2a75
Seeing issues when running with SAMZA-157, so reverting.
This reverts commit 3a6f2555aa92d47dd85b54f3f1503719c08d31a7.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2811c2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2811c2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2811c2a7
Branch: refs/heads/master
Commit: 2811c2a758dd2d387ed4ca25c757516dfcece70b
Parents: 3a6f255
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Thu Mar 13 17:11:07 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Mar 13 17:11:07 2014 -0700
----------------------------------------------------------------------
.../org/apache/samza/system/SystemAdmin.java | 14 +-
.../samza/system/SystemStreamMetadata.java | 50 ---
...inglePartitionWithoutOffsetsSystemAdmin.java | 12 -
.../apache/samza/checkpoint/OffsetManager.scala | 351 -------------------
.../org/apache/samza/config/StreamConfig.scala | 4 -
.../org/apache/samza/config/SystemConfig.scala | 3 -
.../apache/samza/container/SamzaContainer.scala | 47 ++-
.../apache/samza/container/TaskInstance.scala | 72 ++--
.../system/chooser/BootstrappingChooser.scala | 19 +-
.../samza/checkpoint/TestOffsetManager.scala | 188 ----------
.../samza/container/TestTaskInstance.scala | 28 +-
.../samza/system/kafka/KafkaSystemAdmin.scala | 16 +-
.../system/kafka/TestKafkaSystemAdmin.scala | 19 +-
.../samza/system/mock/MockSystemAdmin.java | 12 -
.../test/integration/TestStatefulTask.scala | 20 +-
15 files changed, 118 insertions(+), 737 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 3976253..437bfb2 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -29,21 +29,9 @@ import java.util.Set;
* system.
*/
public interface SystemAdmin {
-
- /**
- * Fetches the offsets for the messages immediately after the supplied offsets
- * for a group of SystemStreamPartitions.
- *
- * @param offsets
- * Map from SystemStreamPartition to current offsets.
- * @return Map from SystemStreamPartition to offsets immediately after the
- * current offsets.
- */
- Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets);
-
/**
* Fetch metadata from a system for a set of streams.
- *
+ *
* @param streamNames
* The streams to to fetch metadata for.
* @return A map from stream name to SystemStreamMetadata for each stream
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
index 817c557..32e142a 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
@@ -21,9 +21,7 @@ package org.apache.samza.system;
import java.util.Collections;
import java.util.Map;
-
import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
/**
* SystemAdmins use this class to return useful metadata about a stream's offset
@@ -141,23 +139,6 @@ public class SystemStreamMetadata {
return upcomingOffset;
}
- /**
- * @param offsetType
- * The type of offset to get. Either oldest, newest, or upcoming.
- * @return The corresponding offset for the offset type requested.
- */
- public String getOffset(OffsetType offsetType) {
- if (offsetType.equals(OffsetType.OLDEST)) {
- return getOldestOffset();
- } else if (offsetType.equals(OffsetType.NEWEST)) {
- return getNewestOffset();
- } else if (offsetType.equals(OffsetType.UPCOMING)) {
- return getUpcomingOffset();
- } else {
- throw new SamzaException("Invalid offset type defined " + offsetType + ".");
- }
- }
-
@Override
public int hashCode() {
final int prime = 31;
@@ -200,35 +181,4 @@ public class SystemStreamMetadata {
return "SystemStreamPartitionMetadata [oldestOffset=" + oldestOffset + ", newestOffset=" + newestOffset + ", upcomingOffset=" + upcomingOffset + "]";
}
}
-
- /**
- * OffsetType is an enum used to define which offset should be used when
- * reading from a SystemStreamPartition for the first time.
- */
- public enum OffsetType {
-
- /**
- * Signals the offset of the oldest message in a SystemStreamPartition.
- */
- OLDEST("oldest"),
-
- /**
- * Signals the offset of the newest message in a SystemStreamPartition.
- */
- NEWEST("newest"),
-
- /**
- * Signals the offset of the next message to be written into a
- * SystemStreamPartition. If the offset of the most recent message written
- * to a SystemStreamPartition is 7, then upcoming would signal offset 8
- * (assuming the offsets were incremental).
- */
- UPCOMING("upcoming");
-
- private final String offsetType;
-
- private OffsetType(String offsetType) {
- this.offsetType = offsetType;
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 38e313f..44fd82a 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -26,7 +26,6 @@ import org.apache.samza.Partition;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
-import org.apache.samza.system.SystemStreamPartition;
/**
* A simple helper admin class that defines a single partition (partition 0) for
@@ -52,15 +51,4 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
return metadata;
}
-
- @Override
- public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
- Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
-
- for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
- offsetsAfter.put(systemStreamPartition, null);
- }
-
- return offsetsAfter;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
deleted file mode 100644
index 80341c8..0000000
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint
-
-import org.apache.samza.system.SystemStream
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.OffsetType
-import org.apache.samza.SamzaException
-import scala.collection.JavaConversions._
-import grizzled.slf4j.Logging
-import org.apache.samza.config.Config
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.system.SystemAdmin
-
-/**
- * OffsetSetting encapsulates a SystemStream's metadata, default offset, and
- * reset offset settings. It's just a convenience class to make OffsetManager
- * easier to work with.
- */
-case class OffsetSetting(
- /**
- * The metadata for the SystemStream.
- */
- metadata: SystemStreamMetadata,
-
- /**
- * The default offset (oldest, newest, or upcoming) for the SystemStream.
- * This setting is used when no checkpoint is available for a SystemStream
- * if the job is starting for the first time, or the SystemStream has been
- * reset (see resetOffsets, below).
- */
- defaultOffset: OffsetType,
-
- /**
- * Whether the SystemStream's offset should be reset or not. Determines
- * whether an offset should be ignored at initialization time, even if a
- * checkpoint is available. This is useful for jobs that wish to restart
- * reading from a stream at a different position than where they last
- * checkpointed. If this is true, then defaultOffset will be used to find
- * the new starting position in the stream.
- */
- resetOffset: Boolean)
-
-/**
- * OffsetManager object is a helper that does wiring to build an OffsetManager
- * from a config object.
- */
-object OffsetManager extends Logging {
- def apply(
- systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
- config: Config,
- checkpointManager: CheckpointManager = null,
- systemAdmins: Map[String, SystemAdmin] = Map()) = {
-
- debug("Building offset manager for %s." format systemStreamMetadata)
-
- val offsetSettings = systemStreamMetadata
- .map {
- case (systemStream, systemStreamMetadata) =>
- // Get default offset.
- val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
- val systemDefaultOffset = config.getDefaultSystemOffset(systemStream.getSystem)
- val defaultOffsetType = if (streamDefaultOffset.isDefined) {
- OffsetType.valueOf(streamDefaultOffset.get.toUpperCase)
- } else if (systemDefaultOffset.isDefined) {
- OffsetType.valueOf(systemDefaultOffset.get.toUpperCase)
- } else {
- info("No default offset for %s defined. Using newest." format systemStream)
- OffsetType.UPCOMING
- }
- debug("Using default offset %s for %s." format (defaultOffsetType, systemStream))
-
- // Get reset offset.
- val resetOffset = config.getResetOffset(systemStream)
- debug("Using reset offset %s for %s." format (resetOffset, systemStream))
-
- // Build OffsetSetting so we can create a map for OffsetManager.
- (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
- }.toMap
- new OffsetManager(offsetSettings, checkpointManager, systemAdmins)
- }
-}
-
-/**
- * OffsetManager does several things:
- *
- * <ul>
- * <li>Loads last checkpointed offset for all input SystemStreamPartitions in a
- * SamzaContainer.</li>
- * <li>Uses last checkpointed offset to figure out the next offset to start
- * reading from for each input SystemStreamPartition in a SamzaContainer</li>
- * <li>Keep track of the last processed offset for each SystemStreamPartitions
- * in a SamzaContainer.</li>
- * <li>Checkpoints the last processed offset for each SystemStreamPartitions
- * in a SamzaContainer periodically to the CheckpointManager.</li>
- * </ul>
- *
- * All partitions must be registered before start is called, and start must be
- * called before get/update/checkpoint/stop are called.
- */
-class OffsetManager(
-
- /**
- * Offset settings for all streams that the OffsetManager is managing.
- */
- val offsetSettings: Map[SystemStream, OffsetSetting] = Map(),
-
- /**
- * Optional checkpoint manager for checkpointing offsets whenever
- * checkpoint is called.
- */
- val checkpointManager: CheckpointManager = null,
-
- /**
- * SystemAdmins that are used to get next offsets from last checkpointed
- * offsets. Map is from system name to SystemAdmin class for the system.
- */
- val systemAdmins: Map[String, SystemAdmin] = Map()) extends Logging {
-
- /**
- * Last offsets processed for each SystemStreamPartition.
- */
- var lastProcessedOffsets = Map[SystemStreamPartition, String]()
-
- /**
- * Offsets to start reading from for each SystemStreamPartition. This
- * variable is populated after all checkpoints have been restored.
- */
- var startingOffsets = Map[SystemStreamPartition, String]()
-
- /**
- * The set of system stream partitions that have been registered with the
- * OffsetManager. These are the SSPs that will be tracked within the offset
- * manager.
- */
- var systemStreamPartitions = Set[SystemStreamPartition]()
-
- def register(systemStreamPartition: SystemStreamPartition) {
- systemStreamPartitions += systemStreamPartition
- }
-
- def start {
- registerCheckpointManager
- loadOffsetsFromCheckpointManager
- stripResetStreams
- loadStartingOffsets
- loadDefaults
-
- info("Successfully loaded last processed offsets: %s" format lastProcessedOffsets)
- info("Successfully loaded starting offsets: %s" format startingOffsets)
- }
-
- /**
- * Set the last processed offset for a given SystemStreamPartition.
- */
- def update(systemStreamPartition: SystemStreamPartition, offset: String) {
- lastProcessedOffsets += systemStreamPartition -> offset
- }
-
- /**
- * Get the last processed offset for a SystemStreamPartition.
- */
- def getLastProcessedOffset(systemStreamPartition: SystemStreamPartition) = {
- lastProcessedOffsets.get(systemStreamPartition)
- }
-
- /**
- * Get the starting offset for a SystemStreamPartition. This is the offset
- * where a SamzaContainer begins reading from when it starts up.
- */
- def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
- startingOffsets.get(systemStreamPartition)
- }
-
- /**
- * Checkpoint all offsets for a given partition using the CheckpointManager.
- */
- def checkpoint(partition: Partition) {
- if (checkpointManager != null) {
- debug("Checkpointing offsets for partition %s." format partition)
-
- val partitionOffsets = lastProcessedOffsets
- .filterKeys(_.getPartition.equals(partition))
- .map { case (systemStreamPartition, offset) => (systemStreamPartition.getSystemStream, offset) }
- .toMap
-
- checkpointManager.writeCheckpoint(partition, new Checkpoint(partitionOffsets))
- } else {
- debug("Skipping checkpointing for partition %s because no checkpoint manager is defined." format partition)
- }
- }
-
- def stop {
- if (checkpointManager != null) {
- debug("Shutting down checkpoint manager.")
-
- checkpointManager.stop
- } else {
- debug("Skipping checkpoint manager shutdown because no checkpoint manager is defined.")
- }
- }
-
- /**
- * Returns a set of partitions that have been registered with this offset
- * manager.
- */
- private def getPartitions = {
- systemStreamPartitions
- .map(_.getPartition)
- .toSet
- }
-
- /**
- * Register all partitions with the CheckpointManager.
- */
- private def registerCheckpointManager {
- if (checkpointManager != null) {
- debug("Registering checkpoint manager.")
-
- getPartitions.foreach(checkpointManager.register)
- } else {
- debug("Skipping checkpoint manager registration because no manager was defined.")
- }
- }
-
- /**
- * Loads last processed offsets from checkpoint manager for all registered
- * partitions.
- */
- private def loadOffsetsFromCheckpointManager {
- if (checkpointManager != null) {
- debug("Loading offsets from checkpoint manager.")
-
- checkpointManager.start
-
- lastProcessedOffsets ++= getPartitions.flatMap(restoreOffsetsFromCheckpoint(_))
- } else {
- debug("Skipping offset load from checkpoint manager because no manager was defined.")
- }
- }
-
- /**
- * Loads last processed offsets for a single partition.
- */
- private def restoreOffsetsFromCheckpoint(partition: Partition): Map[SystemStreamPartition, String] = {
- debug("Loading checkpoints for partition: %s." format partition)
-
- checkpointManager
- .readLastCheckpoint(partition)
- .getOffsets
- .map { case (systemStream, offset) => (new SystemStreamPartition(systemStream, partition), offset) }
- .toMap
- }
-
- /**
- * Removes offset settings for all SystemStreams that are to be forcibly
- * reset using resetOffsets.
- */
- private def stripResetStreams {
- val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets.keys)
-
- systemStreamPartitionsToReset.foreach(systemStreamPartition => {
- val offset = lastProcessedOffsets(systemStreamPartition)
- info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
- })
-
- lastProcessedOffsets --= systemStreamPartitionsToReset
- }
-
- /**
- * Returns a set of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
- */
- private def getSystemStreamPartitionsToReset(systemStreamPartitions: Iterable[SystemStreamPartition]): Set[SystemStreamPartition] = {
- systemStreamPartitions
- .filter(systemStreamPartition => {
- val systemStream = systemStreamPartition.getSystemStream
- offsetSettings
- .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
- .resetOffset
- }).toSet
- }
-
- /**
- * Use last processed offsets to get next available offset for each
- * SystemStreamPartition, and populate startingOffsets.
- */
- private def loadStartingOffsets {
- startingOffsets ++= lastProcessedOffsets
- // Group offset map according to systemName.
- .groupBy(_._1.getSystem)
- // Get next offsets for each system.
- .flatMap {
- case (systemName, systemStreamPartitionOffsets) =>
- systemAdmins
- .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
- .getOffsetsAfter(systemStreamPartitionOffsets)
- }
- }
-
- /**
- * Use defaultOffsets to get a next offset for every SystemStreamPartition
- * that was registered, but has no offset.
- */
- private def loadDefaults {
- systemStreamPartitions.foreach(systemStreamPartition => {
- if (!startingOffsets.contains(systemStreamPartition)) {
- val systemStream = systemStreamPartition.getSystemStream
- val partition = systemStreamPartition.getPartition
- val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
- val systemStreamMetadata = offsetSetting.metadata
- val offsetType = offsetSetting.defaultOffset
-
- debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
-
- val systemStreamPartitionMetadata = systemStreamMetadata
- .getSystemStreamPartitionMetadata
- .get(partition)
-
- if (systemStreamPartitionMetadata != null) {
- val nextOffset = systemStreamPartitionMetadata.getOffset(offsetType)
-
- debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
-
- startingOffsets += systemStreamPartition -> nextOffset
- } else {
- throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
- }
- }
- })
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index d71ead1..517e9ae 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -31,7 +31,6 @@ object StreamConfig {
val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
- val CONSUMER_OFFSET_DEFAULT = STREAM_PREFIX + "samza.offset.default"
implicit def Config2Stream(config: Config) = new StreamConfig(config)
}
@@ -65,9 +64,6 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case _ => false
}
- def getDefaultStreamOffset(systemStream: SystemStream) =
- getOption(StreamConfig.CONSUMER_OFFSET_DEFAULT format (systemStream.getSystem, systemStream.getStream))
-
/**
* Returns a list of all SystemStreams that have a serde defined from the config file.
*/
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 5bb17c7..ce63a8a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -29,7 +29,6 @@ object SystemConfig {
val SYSTEM_FACTORY = "systems.%s.samza.factory"
val KEY_SERDE = "systems.%s.samza.key.serde"
val MSG_SERDE = "systems.%s.samza.msg.serde"
- val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
implicit def Config2System(config: Config) = new SystemConfig(config)
}
@@ -41,8 +40,6 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE format name)
- def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
-
/**
* Returns a list of all system names from the config file. Useful for
* getting individual systems.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c101b59..9c23244 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -62,7 +62,6 @@ import org.apache.samza.system.chooser.RoundRobinChooserFactory
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.checkpoint.OffsetManager
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -101,6 +100,12 @@ object SamzaContainer extends Logging {
info("Got system names: %s" format systemNames)
+ val resetInputStreams = systemNames.flatMap(systemName => {
+ config.getResetOffsetMap(systemName)
+ }).toMap
+
+ info("Got input stream resets: %s" format resetInputStreams)
+
val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_))
debug("Got serde streams: %s" format serdeStreams)
@@ -274,10 +279,6 @@ object SamzaContainer extends Logging {
info("Got checkpoint manager: %s" format checkpointManager)
- val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins)
-
- info("Got offset manager: %s" format offsetManager)
-
val consumerMultiplexer = new SystemConsumers(
// TODO add config values for no new message timeout and max msgs per stream partition
chooser = chooser,
@@ -419,11 +420,12 @@ object SamzaContainer extends Logging {
metrics = taskInstanceMetrics,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
- offsetManager = offsetManager,
storageManager = storageManager,
+ checkpointManager = checkpointManager,
reporters = reporters,
listeners = listeners,
inputStreams = inputStreamsForThisPartition,
+ resetInputStreams = resetInputStreams,
windowMs = taskWindowMs,
commitMs = taskCommitMs,
collector = collector)
@@ -438,8 +440,8 @@ object SamzaContainer extends Logging {
config = config,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
- offsetManager = offsetManager,
metrics = samzaContainerMetrics,
+ checkpointManager = checkpointManager,
reporters = reporters,
jvm = jvm)
}
@@ -480,7 +482,7 @@ class SamzaContainer(
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers,
metrics: SamzaContainerMetrics,
- offsetManager: OffsetManager = new OffsetManager,
+ checkpointManager: CheckpointManager = null,
reporters: Map[String, MetricsReporter] = Map(),
jvm: JvmMetrics = null) extends Runnable with Logging {
@@ -489,7 +491,7 @@ class SamzaContainer(
info("Starting container.")
startMetrics
- startOffsetManager
+ startCheckpoints
startStores
startTask
startProducers
@@ -522,7 +524,7 @@ class SamzaContainer(
shutdownProducers
shutdownTask
shutdownStores
- shutdownOffsetManager
+ shutdownCheckpoints
shutdownMetrics
info("Shutdown complete.")
@@ -548,14 +550,18 @@ class SamzaContainer(
})
}
- def startOffsetManager {
- info("Registering task instances with offsets.")
+ def startCheckpoints {
+ info("Registering task instances with checkpoints.")
- taskInstances.values.foreach(_.registerOffsets)
+ taskInstances.values.foreach(_.registerCheckpoints)
- info("Starting offset manager.")
+ if (checkpointManager != null) {
+ info("Registering checkpoint manager.")
- offsetManager.start
+ checkpointManager.start
+ } else {
+ warn("No checkpoint manager defined. No consumer offsets will be maintained for this job.")
+ }
}
def startStores {
@@ -660,10 +666,13 @@ class SamzaContainer(
taskInstances.values.foreach(_.shutdownStores)
}
- def shutdownOffsetManager {
- info("Shutting down offset manager.")
-
- offsetManager.stop
+ def shutdownCheckpoints {
+ if (checkpointManager != null) {
+ info("Shutting down checkpoint manager.")
+ checkpointManager.stop
+ } else {
+ info("No checkpoint manager defined, so skipping checkpoint manager stop.")
+ }
}
def shutdownMetrics {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c4b135c..5127595 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -1,4 +1,5 @@
/*
+
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -42,8 +43,6 @@ import org.apache.samza.system.SystemConsumers
import org.apache.samza.system.SystemProducers
import org.apache.samza.task.ReadableCoordinator
import org.apache.samza.metrics.Gauge
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.SamzaException
class TaskInstance(
task: StreamTask,
@@ -52,16 +51,18 @@ class TaskInstance(
metrics: TaskInstanceMetrics,
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers,
- offsetManager: OffsetManager = new OffsetManager,
storageManager: TaskStorageManager = null,
+ checkpointManager: CheckpointManager = null,
reporters: Map[String, MetricsReporter] = Map(),
listeners: Seq[TaskLifecycleListener] = Seq(),
inputStreams: Set[SystemStream] = Set(),
+ resetInputStreams: Map[SystemStream, Boolean] = Map(),
windowMs: Long = -1,
commitMs: Long = 60000,
clock: () => Long = { System.currentTimeMillis },
collector: ReadableCollector = new ReadableCollector) extends Logging {
+ var offsets = Map[SystemStream, String]()
var lastWindowMs = 0L
var lastCommitMs = 0L
val isInitableTask = task.isInstanceOf[InitableTask]
@@ -85,12 +86,14 @@ class TaskInstance(
reporters.values.foreach(_.register(metrics.source, metrics.registry))
}
- def registerOffsets {
- debug("Registering offsets for partition: %s." format partition)
+ def registerCheckpoints {
+ if (checkpointManager != null) {
+ debug("Registering checkpoint manager for partition: %s." format partition)
- inputStreams.foreach(systemStream => {
- offsetManager.register(new SystemStreamPartition(systemStream, partition))
- })
+ checkpointManager.register(partition)
+ } else {
+ debug("Skipping checkpoint manager registration for partition: %s." format partition)
+ }
}
def startStores {
@@ -124,19 +127,34 @@ class TaskInstance(
}
def registerConsumers {
+ if (checkpointManager != null) {
+ debug("Loading checkpoints for partition: %s." format partition)
+
+ val checkpoint = checkpointManager.readLastCheckpoint(partition)
+
+ if (checkpoint != null) {
+ for ((systemStream, offset) <- checkpoint.getOffsets) {
+ if (!resetInputStreams.getOrElse(systemStream, false)) {
+ offsets += systemStream -> offset
+
+ metrics.addOffsetGauge(systemStream, () => offsets(systemStream))
+ } else {
+ info("Got offset %s for %s and partition %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream, partition))
+ }
+ }
+
+ info("Successfully loaded offsets for partition: %s, %s" format (partition, offsets))
+ } else {
+ warn("No checkpoint found for partition: %s. This is allowed if this is your first time running the job, but if it's not, you've probably lost data." format partition)
+ }
+ }
+
debug("Registering consumers for partition: %s." format partition)
- inputStreams.foreach(systemStream => {
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val offset = offsetManager.getStartingOffset(systemStreamPartition)
- .getOrElse(throw new SamzaException("No offset defined for partition %s: %s" format (partition, systemStream)))
- consumerMultiplexer.register(systemStreamPartition, offset)
- metrics.addOffsetGauge(systemStream, () => {
- offsetManager
- .getLastProcessedOffset(systemStreamPartition)
- .getOrElse(null)
- })
- })
+ inputStreams.foreach(stream =>
+ consumerMultiplexer.register(
+ new SystemStreamPartition(stream, partition),
+ offsets.get(stream).getOrElse(null)))
}
def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
@@ -152,7 +170,7 @@ class TaskInstance(
trace("Updating offset map for partition: %s, %s, %s" format (partition, envelope.getSystemStreamPartition, envelope.getOffset))
- offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
+ offsets += envelope.getSystemStreamPartition.getSystemStream -> envelope.getOffset
}
def window(coordinator: ReadableCoordinator) {
@@ -203,9 +221,11 @@ class TaskInstance(
producerMultiplexer.flush(metrics.source)
- trace("Committing offset manager for partition: %s" format partition)
+ if (checkpointManager != null) {
+ trace("Committing checkpoint manager for partition: %s" format partition)
- offsetManager.checkpoint(partition)
+ checkpointManager.writeCheckpoint(partition, new Checkpoint(offsets))
+ }
lastCommitMs = clock()
} else {
@@ -238,9 +258,9 @@ class TaskInstance(
debug("Skipping storage manager shutdown for partition: %s" format partition)
}
}
-
+
override def toString() = "TaskInstance for class %s and partition %s." format (task.getClass.getName, partition)
-
- def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size)
-
+
+ def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size)
+
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index 91c1813..f7d3c8b 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -30,7 +30,6 @@ import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemStreamMetadata
import scala.collection.JavaConversions._
import org.apache.samza.SamzaException
-import org.apache.samza.system.SystemStreamMetadata.OffsetType
/**
* BootstrappingChooser is a composable MessageChooser that only chooses
@@ -115,7 +114,7 @@ class BootstrappingChooser(
// offset for this system stream partition, then we've already read all
// messages in the stream, and we're at head for this system stream
// partition.
- checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
+ checkOffset(systemStreamPartition, offset, Upcoming)
wrapped.register(systemStreamPartition, offset)
}
@@ -166,7 +165,7 @@ class BootstrappingChooser(
// If the offset we just read is the same as the offset for the last
// message (newest) in this system stream partition, then we have read
// all messages, and can mark this SSP as bootstrapped.
- checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+ checkOffset(systemStreamPartition, offset, Newest)
}
envelope
@@ -208,7 +207,7 @@ class BootstrappingChooser(
* Upcoming is useful during the registration phase,
* and newest is useful during the choosing phase.
*/
- private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
+ private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, newestOrUpcoming: OffsetType) {
val systemStream = systemStreamPartition.getSystemStream
val systemStreamMetadata = bootstrapStreamMetadata.getOrElse(systemStreamPartition.getSystemStream, null)
// Metadata for system/stream, and system/stream/partition are allowed to
@@ -225,11 +224,15 @@ class BootstrappingChooser(
// null. A null partition metadata implies that the stream is not a
// bootstrap stream, and therefore, there is no need to check its offset.
null
+ } else if (Newest.equals(newestOrUpcoming)) {
+ systemStreamPartitionMetadata.getNewestOffset
+ } else if (Upcoming.equals(newestOrUpcoming)) {
+ systemStreamPartitionMetadata.getUpcomingOffset
} else {
- systemStreamPartitionMetadata.getOffset(offsetType)
+ throw new SamzaException("Got unknown offset type %s" format newestOrUpcoming)
}
- trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
+ trace("Check %s offset %s against %s for %s." format (newestOrUpcoming.getClass.getSimpleName, offset, offsetToCheck, systemStreamPartition))
// The SSP is no longer lagging if the envelope's offset equals the
// latest offset.
@@ -269,3 +272,7 @@ class BootstrappingChooserMetrics(val registry: MetricsRegistry = new MetricsReg
newGauge("%s-%s-lagging-partitions" format (systemStream.getSystem, systemStream.getStream), getValue)
}
}
+
+private sealed abstract class OffsetType
+private object Upcoming extends OffsetType
+private object Newest extends OffsetType
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
deleted file mode 100644
index 5844695..0000000
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint
-
-import scala.collection.JavaConversions._
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.OffsetType
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
-import org.junit.Test
-import org.apache.samza.SamzaException
-import org.apache.samza.util.TestUtil._
-import org.apache.samza.config.MapConfig
-import org.apache.samza.system.SystemAdmin
-
-class TestOffsetManager {
- @Test
- def testSystemShouldUseDefaults {
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest"))
- val offsetManager = OffsetManager(systemStreamMetadata, config)
- offsetManager.register(systemStreamPartition)
- offsetManager.start
- assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
- assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
- assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
- }
-
- @Test
- def testShouldLoadFromAndSaveWithCheckpointManager {
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val config = new MapConfig
- val checkpointManager = getCheckpointManager(systemStreamPartition)
- val systemAdmins = Map("test-system" -> getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
- offsetManager.register(systemStreamPartition)
- offsetManager.start
- assertTrue(checkpointManager.isStarted)
- assertEquals(1, checkpointManager.registered.size)
- assertEquals(partition, checkpointManager.registered.head)
- assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(partition))
- // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
- assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
- assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
- offsetManager.update(systemStreamPartition, "46")
- assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
- offsetManager.update(systemStreamPartition, "47")
- assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
- // Should never update starting offset.
- assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
- offsetManager.checkpoint(partition)
- val expectedCheckpoint = new Checkpoint(Map(systemStream -> "47"))
- assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(partition))
- }
-
- @Test
- def testShouldResetStreams {
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val defaultOffsets = Map(systemStream -> OffsetType.OLDEST)
- val checkpoint = new Checkpoint(Map(systemStream -> "45"))
- val checkpointManager = getCheckpointManager(systemStreamPartition)
- val config = new MapConfig(Map(
- "systems.test-system.samza.offset.default" -> "oldest",
- "systems.test-system.streams.test-stream.samza.reset.offset" -> "true"))
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager)
- offsetManager.register(systemStreamPartition)
- offsetManager.start
- assertTrue(checkpointManager.isStarted)
- assertEquals(1, checkpointManager.registered.size)
- assertEquals(partition, checkpointManager.registered.head)
- assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition))
- // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
- assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
- }
-
- @Test
- def testShouldFailWhenMissingMetadata {
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val offsetManager = new OffsetManager
- offsetManager.register(systemStreamPartition)
-
- expect(classOf[SamzaException], Some("Attempting to load defaults for stream SystemStream [system=test-system, stream=test-stream], which has no offset settings.")) {
- offsetManager.start
- }
- }
-
- @Test
- def testShouldFailWhenMissingDefault {
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
- offsetManager.register(systemStreamPartition)
-
- expect(classOf[SamzaException], Some("No default offeset defined for SystemStream [system=test-system, stream=test-stream]. Unable to load a default.")) {
- offsetManager.start
- }
- }
-
- @Test
- def testDefaultSystemShouldFailWhenFailIsSpecified {
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail"))
- expect(classOf[IllegalArgumentException]) {
- OffsetManager(systemStreamMetadata, config)
- }
- }
-
- @Test
- def testDefaultStreamShouldFailWhenFailIsSpecified {
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail"))
- expect(classOf[IllegalArgumentException]) {
- OffsetManager(systemStreamMetadata, config)
- }
- }
-
- private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = {
- val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45"))
-
- new CheckpointManager {
- var isStarted = false
- var isStopped = false
- var registered = Set[Partition]()
- var checkpoints = Map(systemStreamPartition.getPartition -> checkpoint)
- def start { isStarted = true }
- def register(partition: Partition) { registered += partition }
- def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { checkpoints += partition -> checkpoint }
- def readLastCheckpoint(partition: Partition) = checkpoints(partition)
- def stop { isStopped = true }
- }
- }
-
- private def getSystemAdmin = {
- new SystemAdmin {
- def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
- offsets.mapValues(offset => (offset.toLong + 1).toString)
-
- def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
- Map[String, SystemStreamMetadata]()
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 27b4ca5..5e9dd07 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -36,10 +36,6 @@ import org.apache.samza.system.SystemConsumer
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import scala.collection.JavaConversions._
class TestTaskInstance {
@Test
@@ -57,24 +53,20 @@ class TestTaskInstance {
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager)
- val systemStream = new SystemStream("test-system", "test-stream")
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- // Pretend our last checkpointed (next) offset was 2.
- val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskInstance: TaskInstance = new TaskInstance(
task,
partition,
config,
new TaskInstanceMetrics,
- consumerMultiplexer,
- producerMultiplexer,
- offsetManager)
- // Pretend we got a message with offset 2 and next offset 3.
- taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), new ReadableCoordinator)
- // Check to see if the offset manager has been properly updated with offset 3.
- val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition)
- assertTrue(lastProcessedOffset.isDefined)
- assertEquals("2", lastProcessedOffset.get)
+ consumerMultiplexer: SystemConsumers,
+ producerMultiplexer: SystemProducers)
+ val systemStream = new SystemStream("test-system", "test-stream")
+ // Pretend our last checkpointed offset was 1.
+ taskInstance.offsets += systemStream -> "1"
+ // Pretend we got a message with offset 2.
+ taskInstance.process(new IncomingMessageEnvelope(new SystemStreamPartition("test-system", "test-stream", new Partition(0)), "2", null, null), new ReadableCoordinator)
+ // Check to see if the offset map has been properly updated with offset 2.
+ assertEquals(1, taskInstance.offsets.size)
+ assertEquals("2", taskInstance.offsets(systemStream))
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index dafc980..5325549 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -37,7 +37,6 @@ import grizzled.slf4j.Logging
import java.util.UUID
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import kafka.consumer.ConsumerConfig
object KafkaSystemAdmin extends Logging {
/**
@@ -100,7 +99,7 @@ class KafkaSystemAdmin(
* from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
* configuration.
*/
- bufferSize: Int = ConsumerConfig.SocketBufferSize,
+ bufferSize: Int = 1024000,
/**
* The client ID to use for the simple consumer when fetching metadata from
@@ -110,17 +109,6 @@ class KafkaSystemAdmin(
import KafkaSystemAdmin._
- /**
- * Returns the offset for the message after the specified offset for each
- * SystemStreamPartition that was passed in.
- */
- def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
- // This is safe to do with Kafka, even if a topic is key-deduped. If the
- // offset doesn't exist on a compacted topic, Kafka will return the first
- // message AFTER the offset that was specified in the fetch request.
- offsets.mapValues(offset => (offset.toLong + 1).toString)
- }
-
def getSystemStreamMetadata(streams: java.util.Set[String]) =
getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
@@ -139,7 +127,7 @@ class KafkaSystemAdmin(
var done = false
var consumer: SimpleConsumer = null
- debug("Fetching system stream metadata for: %s" format streams)
+ debug("Fetching offsets for: %s" format streams)
while (!done) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index cd9d926..eaa9e53 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -274,23 +274,12 @@ class TestKafkaSystemAdmin {
val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
- new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0"))))
- }
-
- @Test
- def testOffsetsAfter {
- val systemAdmin = new KafkaSystemAdmin("test", brokers)
- val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
- val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
- val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
- ssp1 -> "1",
- ssp2 -> "2"))
- assertEquals("2", offsetsAfter(ssp1))
- assertEquals("3", offsetsAfter(ssp2))
+ new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0")
+ )))
}
class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
- import kafka.api.{ TopicMetadata, TopicMetadataResponse }
+ import kafka.api.{TopicMetadata, TopicMetadataResponse}
// Simulate Kafka telling us that the leader for the topic is not available
override def getTopicMetadata(topics: Set[String]) = {
@@ -303,7 +292,7 @@ class TestKafkaSystemAdmin {
class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy {
var countCalls = 0
-
+
override def sleep() = {
if (countCalls >= maxCalls) throw new CallLimitReached
countCalls += 1
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index fa1d51b..2abe1c8 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -25,7 +25,6 @@ import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
/**
@@ -52,15 +51,4 @@ public class MockSystemAdmin implements SystemAdmin {
return metadata;
}
-
- @Override
- public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
- Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
-
- for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
- offsetsAfter.put(systemStreamPartition, null);
- }
-
- return offsetsAfter;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 8177cbf..7e81387 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -211,16 +211,24 @@ class TestStatefulTask {
"stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
"stores.mystore.key.serde" -> "string",
"stores.mystore.msg.serde" -> "string",
- "stores.mystore.changelog" -> "kafka.mystore",
+ "stores.mystore.changelog" -> "kafka-state.mystore",
+
+ // TODO we don't need two different systems once SAMZA-157 is committed.
+ // We will be able to do per-stream offset defaults.
// Use smallest reset for input streams, so we can fix SAMZA-166.
"systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
- "systems.kafka.samza.offset.default" -> "oldest",
- "systems.kafka.samza.msg.serde" -> "string",
"systems.kafka.consumer.zookeeper.connect" -> zkConnect,
- // Use largest offset for reset, so we can test SAMZA-142.
- "systems.kafka.consumer.auto.offset.reset" -> "largest",
- "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1))
+ "systems.kafka.consumer.auto.offset.reset" -> "smallest",
+ "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+ "systems.kafka.samza.msg.serde" -> "string",
+
+ // Use largest offset for changelog stream, so we can test SAMZA-142.
+ "systems.kafka-state.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
+ "systems.kafka-state.consumer.zookeeper.connect" -> zkConnect,
+ "systems.kafka-state.consumer.auto.offset.reset" -> "smallest",
+ "systems.kafka-state.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+ "systems.kafka-state.samza.msg.serde" -> "string")
@Test
def testShouldStartAndRestore {