You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/08/24 16:50:45 UTC
[2/2] samza git commit: SAMZA-676: Implemented broadcast stream
SAMZA-676: Implemented broadcast stream
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1f77f8b9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1f77f8b9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1f77f8b9
Branch: refs/heads/master
Commit: 1f77f8b986e1e58982704b71ea9bb497ac0f70df
Parents: 79ec5db
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Aug 24 10:46:16 2015 -0400
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Aug 24 10:46:16 2015 -0400
----------------------------------------------------------------------
checkstyle/import-control.xml | 6 +
.../versioned/container/samza-container.md | 10 +
.../versioned/jobs/configuration-table.html | 16 ++
.../org/apache/samza/system/SystemAdmin.java | 10 +
...inglePartitionWithoutOffsetsSystemAdmin.java | 4 +
.../org/apache/samza/config/TaskConfigJava.java | 84 ++++++++
.../grouper/stream/GroupByPartition.java | 83 ++++++++
.../grouper/stream/GroupByPartitionFactory.java | 29 +++
.../stream/GroupBySystemStreamPartition.java | 80 ++++++++
.../GroupBySystemStreamPartitionFactory.java | 31 +++
.../apache/samza/checkpoint/OffsetManager.scala | 201 ++++++++++++-------
.../org/apache/samza/config/JobConfig.scala | 2 +-
.../org/apache/samza/container/RunLoop.scala | 23 ++-
.../apache/samza/container/SamzaContainer.scala | 5 +-
.../apache/samza/container/TaskInstance.scala | 71 +++++--
.../samza/container/TaskInstanceMetrics.scala | 1 +
.../grouper/stream/GroupByPartition.scala | 41 ----
.../stream/GroupBySystemStreamPartition.scala | 38 ----
.../filereader/FileReaderSystemAdmin.scala | 4 +
.../apache/samza/config/TestTaskConfigJava.java | 61 ++++++
.../grouper/stream/TestGroupByPartition.java | 130 ++++++++++++
.../TestGroupBySystemStreamPartition.java | 104 ++++++++++
.../samza/checkpoint/TestOffsetManager.scala | 39 ++--
.../apache/samza/container/TestRunLoop.scala | 16 ++
.../samza/container/TestSamzaContainer.scala | 1 +
.../samza/container/TestTaskInstance.scala | 92 ++++++++-
.../grouper/stream/GroupByTestBase.scala | 58 ------
.../grouper/stream/TestGroupByPartition.scala | 39 ----
.../TestGroupBySystemStreamPartition.scala | 42 ----
.../samza/coordinator/TestJobCoordinator.scala | 2 +
.../elasticsearch/ElasticsearchSystemAdmin.java | 5 +
.../samza/system/hdfs/HdfsSystemAdmin.scala | 3 +
.../samza-hdfs-test-batch-job-text.properties | 17 ++
.../samza-hdfs-test-batch-job.properties | 17 ++
.../samza-hdfs-test-job-text.properties | 17 ++
.../resources/samza-hdfs-test-job.properties | 16 ++
.../samza/system/kafka/KafkaSystemAdmin.scala | 34 ++--
.../system/kafka/KafkaSystemConsumer.scala | 9 +-
.../samza/system/kafka/KafkaSystemFactory.scala | 1 +
.../system/kafka/TestKafkaSystemConsumer.scala | 36 +++-
.../samza/system/mock/MockSystemAdmin.java | 5 +
.../yarn/TestSamzaAppMasterTaskManager.scala | 2 +
42 files changed, 1117 insertions(+), 368 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index aaa235a..bc07ae8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -33,6 +33,10 @@
<subpackage name="config">
<allow class="org.apache.samza.SamzaException" />
+ <allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.util" />
+
+ <allow class="org.apache.samza.Partition" />
</subpackage>
<subpackage name="serializers">
@@ -113,6 +117,8 @@
<subpackage name="stream">
<allow pkg="org.apache.samza.container" />
<allow pkg="org.apache.samza.system" />
+
+ <allow class="org.apache.samza.Partition" />
</subpackage>
<subpackage name="task">
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/docs/learn/documentation/versioned/container/samza-container.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/samza-container.md b/docs/learn/documentation/versioned/container/samza-container.md
index 9f46414..f97e8a3 100644
--- a/docs/learn/documentation/versioned/container/samza-container.md
+++ b/docs/learn/documentation/versioned/container/samza-container.md
@@ -102,4 +102,14 @@ Thus, if you want two events in different streams to be processed by the same ta
There is one caveat in all of this: Samza currently assumes that a stream's partition count will never change. Partition splitting or repartitioning is not supported. If an input stream has N partitions, it is expected that it has always had, and will always have N partitions. If you want to re-partition a stream, you can write a job that reads messages from the stream, and writes them out to a new stream with the required number of partitions. For example, you could read messages from PageViewEvent, and write them to PageViewEventRepartition.
+### Broadcast Streams
+
+After 0.10.0, Samza supports broadcast streams. You can assign partitions from some streams to all the tasks. For example, you want all the tasks can consume partition 0 and 1 from a stream called global-stream-1, and partition 2 from a stream called global-stream-2. You now can configure:
+
+{% highlight jproperties %}
+task.broadcast.inputs=yourSystem.broadcast-stream-1#[0-1], yourSystem.broadcast-stream-2#2
+{% endhighlight %}
+
+If you use "[]", you are specifying a range.
+
## [Streams »](streams.html)
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 8177fe5..78f2927 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -447,6 +447,22 @@
</tr>
<tr>
+ <td class="property" id="task-global-inputs">task.global.inputs</td>
+ <td class="default"></td>
+ <td class="description">
+ This property specifies the partitions that all tasks should consume. The systemStreamPartitions you put
+ here will be sent to all the tasks.
+ <dl>
+ <dt>Format: <span class="system">system-name</span>.<span class="stream">stream-name</span>#<i>partitionId</i>
+ or <span class="system">system-name</span>.<span class="stream">stream-name</span>#[<i>startingPartitionId</i>-<i>endingPartitionId</i>]</dt>
+ </dl>
+ <dl>
+ <dt>Example: <code>task.global.inputs=mySystem.globalStream#[1-2], mySystem.anotherGlobalStream#1</code></dt>
+ </dl>
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
</tr>
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index a920a10..bc926c5 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -79,4 +79,14 @@ public interface SystemAdmin {
*/
void createCoordinatorStream(String streamName);
+ /**
+ * Compare the two offsets. -1, 0, +1 means offset1 < offset2,
+ * offset1 == offset2 and offset1 > offset2 respectively. Return
+ * null if those two offsets are not comparable
+ *
+ * @param offset1
+ * @param offset2
+ * @return -1 if offset1 < offset2; 0 if offset1 == offset2; 1 if offset1 > offset2. Null if not comparable
+ */
+ Integer offsetComparator(String offset1, String offset2);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 63a1666..2157e69 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -81,4 +81,8 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
}
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
new file mode 100644
index 0000000..015e994
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskConfigJava extends MapConfig {
+ // broadcast streams consumed by all tasks. e.g. kafka.foo#1
+ public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
+ private static final String BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+";
+ private static final String BROADCAST_STREAM_RANGE_PATTERN = "[^#\\.]+\\.[^#\\.]+#\\[[\\d]+\\-[\\d]+\\]+";
+ public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class);
+
+
+ public TaskConfigJava(Config config) {
+ super(config);
+ }
+
+ /**
+ * Get the systemStreamPartitions of the broadcast stream. Specifying
+ * one partition for one stream or a range of the partitions for one
+ * stream is allowed.
+ *
+ * @return a Set of SystemStreamPartitions
+ */
+ public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
+ HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<SystemStreamPartition>();
+ List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS);
+
+ for (String systemStreamPartition : systemStreamPartitions) {
+ if (Pattern.matches(BROADCAST_STREAM_PATTERN, systemStreamPartition)) {
+
+ int hashPosition = systemStreamPartition.indexOf("#");
+ SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, hashPosition));
+ systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(systemStreamPartition.substring(hashPosition + 1)))));
+
+ } else if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, systemStreamPartition)) {
+
+ SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, systemStreamPartition.indexOf("#")));
+
+ int startingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.indexOf("[") + 1, systemStreamPartition.lastIndexOf("-")));
+ int endingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.lastIndexOf("-") + 1, systemStreamPartition.indexOf("]")));
+
+ if (startingPartition > endingPartition) {
+ LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added");
+ }
+ for (int i = startingPartition; i <= endingPartition; i++) {
+ systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i)));
+ }
+ } else {
+ throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
+ + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+ }
+ }
+ return systemStreamPartitionSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
new file mode 100644
index 0000000..3022b72
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+public class GroupByPartition implements SystemStreamPartitionGrouper {
+ private TaskConfigJava taskConfig = null;
+ private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>();
+
+ /**
+ * default constructor
+ */
+ public GroupByPartition() {
+ }
+
+ /**
+ * Accepts the config in the constructor
+ *
+ * @param config job's config
+ */
+ public GroupByPartition(Config config) {
+ if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) {
+ taskConfig = new TaskConfigJava(config);
+ this.broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
+ }
+ }
+
+ @Override
+ public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) {
+ Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+ for (SystemStreamPartition ssp : ssps) {
+ // skip the broadcast streams if there is any
+ if (broadcastStreams.contains(ssp)) {
+ continue;
+ }
+
+ TaskName taskName = new TaskName("Partition " + ssp.getPartition().getPartitionId());
+ if (!groupedMap.containsKey(taskName)) {
+ groupedMap.put(taskName, new HashSet<SystemStreamPartition>());
+ }
+ groupedMap.get(taskName).add(ssp);
+ }
+
+ // assign the broadcast streams to all the taskNames
+ if (!broadcastStreams.isEmpty()) {
+ for (Set<SystemStreamPartition> value : groupedMap.values()) {
+ for (SystemStreamPartition ssp : broadcastStreams) {
+ value.add(ssp);
+ }
+ }
+ }
+
+ return groupedMap;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
new file mode 100644
index 0000000..608508e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+
+public class GroupByPartitionFactory implements SystemStreamPartitionGrouperFactory {
+ @Override
+ public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
+ return new GroupByPartition(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
new file mode 100644
index 0000000..a8b41de
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+public class GroupBySystemStreamPartition implements SystemStreamPartitionGrouper {
+ private TaskConfigJava taskConfig = null;
+ private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>();
+
+ /**
+ * default constructor
+ */
+ public GroupBySystemStreamPartition() {
+ }
+
+ /**
+ * A constructor that accepts job config as the parameter
+ *
+ * @param config job config
+ */
+ public GroupBySystemStreamPartition(Config config) {
+ if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) {
+ taskConfig = new TaskConfigJava(config);
+ broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
+ }
+ }
+
+ @Override
+ public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) {
+ Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+ for (SystemStreamPartition ssp : ssps) {
+ if (broadcastStreams.contains(ssp)) {
+ continue;
+ }
+
+ HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>();
+ sspSet.add(ssp);
+ groupedMap.put(new TaskName(ssp.toString()), sspSet);
+ }
+
+ // assign the broadcast streams to all the taskNames
+ if (!broadcastStreams.isEmpty()) {
+ for (Set<SystemStreamPartition> value : groupedMap.values()) {
+ for (SystemStreamPartition ssp : broadcastStreams) {
+ value.add(ssp);
+ }
+ }
+ }
+
+ return groupedMap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
new file mode 100644
index 0000000..04a7444
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+
+public class GroupBySystemStreamPartitionFactory implements SystemStreamPartitionGrouperFactory {
+
+ @Override
+ public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
+ return new GroupBySystemStreamPartition();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 20e5d26..1464acc 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,8 @@ object OffsetManager extends Logging {
config: Config,
checkpointManager: CheckpointManager = null,
systemAdmins: Map[String, SystemAdmin] = Map(),
- offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
- latestOffsets: Map[SystemStreamPartition, String] = Map()) = {
+ offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
+ latestOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) = {
debug("Building offset manager for %s." format systemStreamMetadata)
val offsetSettings = systemStreamMetadata
@@ -142,25 +142,28 @@ class OffsetManager(
/**
* offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
*/
- val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+ val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
/*
* The previously read checkpoints restored from the coordinator stream
*/
- val previousCheckpointedOffsets: Map[SystemStreamPartition, String] = Map()
- ) extends Logging {
+ val previousCheckpointedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) extends Logging {
/**
* Last offsets processed for each SystemStreamPartition.
*/
// Filter out null offset values, we can't use them, these exist only because of SSP information
- var lastProcessedOffsets = previousCheckpointedOffsets.filter(_._2 != null)
+ var lastProcessedOffsets = previousCheckpointedOffsets.map {
+ case (taskName, sspToOffset) => {
+ taskName -> sspToOffset.filter(_._2 != null)
+ }
+ }
/**
* Offsets to start reading from for each SystemStreamPartition. This
* variable is populated after all checkpoints have been restored.
*/
- var startingOffsets = Map[SystemStreamPartition, String]()
+ var startingOffsets = Map[TaskName, Map[SystemStreamPartition, String]]()
/**
* The set of system stream partitions that have been registered with the
@@ -172,7 +175,7 @@ class OffsetManager(
def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) {
systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
// register metrics
- systemStreamPartitions.foreach{ case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
+ systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach(ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
}
def start {
@@ -190,23 +193,32 @@ class OffsetManager(
/**
* Set the last processed offset for a given SystemStreamPartition.
*/
- def update(systemStreamPartition: SystemStreamPartition, offset: String) {
- lastProcessedOffsets += systemStreamPartition -> offset
+ def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
+ lastProcessedOffsets.get(taskName) match {
+ case Some(sspToOffsets) => lastProcessedOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> offset))
+ case None => lastProcessedOffsets += (taskName -> Map(systemStreamPartition -> offset))
+ }
}
/**
* Get the last processed offset for a SystemStreamPartition.
*/
- def getLastProcessedOffset(systemStreamPartition: SystemStreamPartition) = {
- lastProcessedOffsets.get(systemStreamPartition)
+ def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = {
+ lastProcessedOffsets.get(taskName) match {
+ case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition)
+ case None => None
+ }
}
/**
* Get the starting offset for a SystemStreamPartition. This is the offset
* where a SamzaContainer begins reading from when it starts up.
*/
- def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
- startingOffsets.get(systemStreamPartition)
+ def getStartingOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = {
+ startingOffsets.get(taskName) match {
+ case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition)
+ case None => None
+ }
}
/**
@@ -217,10 +229,19 @@ class OffsetManager(
debug("Checkpointing offsets for taskName %s." format taskName)
val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet
- val partitionOffsets = lastProcessedOffsets.filterKeys(sspsForTaskName.contains(_))
+ val partitionOffsets = lastProcessedOffsets.get(taskName) match {
+ case Some(sspToOffsets) => sspToOffsets.filterKeys(sspsForTaskName.contains(_))
+ case None => {
+ warn(taskName + " is not found... ")
+ Map[SystemStreamPartition, String]()
+ }
+ }
checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
- lastProcessedOffsets.foreach{ case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
+ lastProcessedOffsets.get(taskName) match {
+ case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
+ case None =>
+ }
} else {
debug("Skipping checkpointing for taskName %s because no checkpoint manager is defined." format taskName)
}
@@ -262,7 +283,9 @@ class OffsetManager(
*/
private def loadOffsets {
debug("Loading offsets")
- lastProcessedOffsets.filter {
+ lastProcessedOffsets.map {
+ case (taskName, sspToOffsets) => {
+ taskName -> sspToOffsets.filter {
case (systemStreamPartition, offset) =>
val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
if (!shouldKeep) {
@@ -271,7 +294,8 @@ class OffsetManager(
info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
shouldKeep
}
-
+ }
+ }
}
/**
@@ -279,27 +303,43 @@ class OffsetManager(
* reset using resetOffsets.
*/
private def stripResetStreams {
- val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets.keys)
-
- systemStreamPartitionsToReset.foreach(systemStreamPartition => {
- val offset = lastProcessedOffsets(systemStreamPartition)
- info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
- })
+ val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets)
+
+ systemStreamPartitionsToReset.foreach {
+ case (taskName, systemStreamPartitions) => {
+ systemStreamPartitions.foreach {
+ systemStreamPartition =>
+ {
+ val offset = lastProcessedOffsets(taskName).get(systemStreamPartition)
+ info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
+ }
+ }
+ }
+ }
- lastProcessedOffsets --= systemStreamPartitionsToReset
+ lastProcessedOffsets = lastProcessedOffsets.map {
+ case (taskName, sspToOffsets) => {
+ taskName -> (sspToOffsets -- systemStreamPartitionsToReset(taskName))
+ }
+ }
}
/**
- * Returns a set of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
+ * Returns a map of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
*/
- private def getSystemStreamPartitionsToReset(systemStreamPartitions: Iterable[SystemStreamPartition]): Set[SystemStreamPartition] = {
- systemStreamPartitions
- .filter(systemStreamPartition => {
- val systemStream = systemStreamPartition.getSystemStream
- offsetSettings
- .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
- .resetOffset
- }).toSet
+ private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: Map[TaskName, Map[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = {
+ taskNameTosystemStreamPartitions.map {
+ case (taskName, sspToOffsets) => {
+ taskName -> (sspToOffsets.filter {
+ case (systemStreamPartition, offset) => {
+ val systemStream = systemStreamPartition.getSystemStream
+ offsetSettings
+ .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
+ .resetOffset
+ }
+ }.keys.toSet)
+ }
+ }
}
/**
@@ -307,16 +347,18 @@ class OffsetManager(
* SystemStreamPartition, and populate startingOffsets.
*/
private def loadStartingOffsets {
- startingOffsets ++= lastProcessedOffsets
- // Group offset map according to systemName.
- .groupBy(_._1.getSystem)
- // Get next offsets for each system.
- .flatMap {
- case (systemName, systemStreamPartitionOffsets) =>
- systemAdmins
- .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
- .getOffsetsAfter(systemStreamPartitionOffsets)
+ startingOffsets = lastProcessedOffsets.map {
+ case (taskName, sspToOffsets) => {
+ taskName -> {
+ sspToOffsets.groupBy(_._1.getSystem).flatMap {
+ case (systemName, systemStreamPartitionOffsets) =>
+ systemAdmins
+ .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
+ .getOffsetsAfter(systemStreamPartitionOffsets)
+ }
+ }
}
+ }
}
/**
@@ -324,42 +366,47 @@ class OffsetManager(
* that was registered, but has no offset.
*/
private def loadDefaults {
- val allSSPs: Set[SystemStreamPartition] = systemStreamPartitions
- .values
- .flatten
- .toSet
-
- allSSPs.foreach(systemStreamPartition => {
- if (!startingOffsets.contains(systemStreamPartition)) {
- val systemStream = systemStreamPartition.getSystemStream
- val partition = systemStreamPartition.getPartition
- val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
- val systemStreamMetadata = offsetSetting.metadata
- val offsetType = offsetSetting.defaultOffset
-
- debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
-
- val systemStreamPartitionMetadata = systemStreamMetadata
- .getSystemStreamPartitionMetadata
- .get(partition)
-
- if (systemStreamPartitionMetadata != null) {
- val nextOffset = {
- val requested = systemStreamPartitionMetadata.getOffset(offsetType)
-
- if (requested == null) {
- warn("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset." format (offsetType, systemStreamPartition))
- systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING)
- } else requested
+ val taskNameToSSPs: Map[TaskName, Set[SystemStreamPartition]] = systemStreamPartitions
+
+ taskNameToSSPs.foreach {
+ case (taskName, systemStreamPartitions) => {
+ systemStreamPartitions.foreach { systemStreamPartition =>
+ if (!startingOffsets.contains(taskName) || !startingOffsets(taskName).contains(systemStreamPartition)) {
+ val systemStream = systemStreamPartition.getSystemStream
+ val partition = systemStreamPartition.getPartition
+ val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
+ val systemStreamMetadata = offsetSetting.metadata
+ val offsetType = offsetSetting.defaultOffset
+
+ debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
+
+ val systemStreamPartitionMetadata = systemStreamMetadata
+ .getSystemStreamPartitionMetadata
+ .get(partition)
+
+ if (systemStreamPartitionMetadata != null) {
+ val nextOffset = {
+ val requested = systemStreamPartitionMetadata.getOffset(offsetType)
+
+ if (requested == null) {
+ warn("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset." format (offsetType, systemStreamPartition))
+ systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING)
+ } else requested
+ }
+
+ debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
+
+ startingOffsets.get(taskName) match {
+ case Some(sspToOffsets) => startingOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> nextOffset))
+ case None => startingOffsets += taskName -> Map(systemStreamPartition -> nextOffset)
+ }
+
+ } else {
+ throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
+ }
}
-
- debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
-
- startingOffsets += systemStreamPartition -> nextOffset
- } else {
- throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
}
}
- })
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index e4b14f4..6d73bb9 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,10 +19,10 @@
package org.apache.samza.config
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.util.Logging
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
object JobConfig {
// job config constants
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 24da35f..6916c5c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -51,14 +51,17 @@ class RunLoop(
// Messages come from the chooser with no connection to the TaskInstance they're bound for.
// Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them.
- val systemStreamPartitionToTaskInstance: Map[SystemStreamPartition, TaskInstance] = {
+ val systemStreamPartitionToTaskInstances = getSystemStreamPartitionToTaskInstancesMapping
+
+ def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance]] = {
// We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly
def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
- taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap
+ taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.groupBy(_._1).map {
+ case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList
+ }
}
-
/**
* Starts the run loop. Blocks until either the tasks request shutdown, or an
* unhandled exception is thrown.
@@ -111,11 +114,15 @@ class RunLoop(
trace("Processing incoming message envelope for SSP %s." format ssp)
metrics.envelopes.inc
- val taskInstance = systemStreamPartitionToTaskInstance(ssp)
- val coordinator = new ReadableCoordinator(taskInstance.taskName)
-
- taskInstance.process(envelope, coordinator)
- checkCoordinator(coordinator)
+ val taskInstances = systemStreamPartitionToTaskInstances(ssp)
+ taskInstances.foreach {
+ taskInstance =>
+ {
+ val coordinator = new ReadableCoordinator(taskInstance.taskName)
+ taskInstance.process(envelope, coordinator)
+ checkCoordinator(coordinator)
+ }
+ }
} else {
trace("No incoming message envelope was available.")
metrics.nullEnvelopes.inc
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 85b012b..61e228b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -335,8 +335,8 @@ object SamzaContainer extends Logging {
info("Got checkpoint manager: %s" format checkpointManager)
- val combinedOffsets: Map[SystemStreamPartition, String] =
- containerModel.getTasks.values().flatMap(_.getCheckpointedOffsets).toMap
+ val combinedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] =
+ containerModel.getTasks.map{case (taskName, taskModel) => taskName -> mapAsScalaMap(taskModel.getCheckpointedOffsets).toMap }.toMap
val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
@@ -508,6 +508,7 @@ object SamzaContainer extends Logging {
taskName = taskName,
config = config,
metrics = taskInstanceMetrics,
+ systemAdmins = systemAdmins,
consumerMultiplexer = consumerMultiplexer,
collector = collector,
containerContext = containerContext,
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c5a5ea5..d32a929 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -36,14 +36,15 @@ import org.apache.samza.task.StreamTask
import org.apache.samza.task.ReadableCoordinator
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.util.Logging
-
import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemAdmin
class TaskInstance(
task: StreamTask,
val taskName: TaskName,
config: Config,
metrics: TaskInstanceMetrics,
+ systemAdmins: Map[String, SystemAdmin],
consumerMultiplexer: SystemConsumers,
collector: TaskInstanceCollector,
containerContext: SamzaContainerContext,
@@ -69,9 +70,15 @@ class TaskInstance(
def getSamzaContainerContext = containerContext
override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
- offsetManager.startingOffsets += (ssp -> offset)
+ val startingOffsets = offsetManager.startingOffsets
+ offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
}
}
+ // store the (ssp -> if this ssp is catched up) mapping. "catched up"
+ // means the same ssp in other taskInstances have the same offset as
+ // the one here.
+ var ssp2catchedupMapping: scala.collection.mutable.Map[SystemStreamPartition, Boolean] = scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
+ systemStreamPartitions.foreach(ssp2catchedupMapping += _ -> false)
def registerMetrics {
debug("Registering metrics for taskName: %s" format taskName)
@@ -115,13 +122,13 @@ class TaskInstance(
debug("Registering consumers for taskName: %s" format taskName)
systemStreamPartitions.foreach(systemStreamPartition => {
- val offset = offsetManager.getStartingOffset(systemStreamPartition)
- .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
+ val offset = offsetManager.getStartingOffset(taskName, systemStreamPartition)
+ .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
consumerMultiplexer.register(systemStreamPartition, offset)
metrics.addOffsetGauge(systemStreamPartition, () => {
offsetManager
- .getLastProcessedOffset(systemStreamPartition)
- .getOrElse(null)
+ .getLastProcessedOffset(taskName, systemStreamPartition)
+ .orNull
})
})
}
@@ -129,15 +136,24 @@ class TaskInstance(
def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
metrics.processes.inc
- trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition))
-
- exceptionHandler.maybeHandle {
- task.process(envelope, collector, coordinator)
+ if (!ssp2catchedupMapping.getOrElse(envelope.getSystemStreamPartition,
+ throw new SamzaException(envelope.getSystemStreamPartition + " is not registered!"))) {
+ checkCaughtUp(envelope)
}
- trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
+ if (ssp2catchedupMapping(envelope.getSystemStreamPartition)) {
+ metrics.messagesActuallyProcessed.inc
+
+ trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition))
+
+ exceptionHandler.maybeHandle {
+ task.process(envelope, collector, coordinator)
+ }
+
+ trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
- offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
+ offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
+ }
}
def window(coordinator: ReadableCoordinator) {
@@ -193,4 +209,35 @@ class TaskInstance(
override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName)
def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s]" format (taskName, isWindowableTask, isClosableTask)
+
+ /**
+ * From the envelope, check if this SSP has catched up with the starting offset of the SSP
+ * in this TaskInstance. If the offsets are not comparable, default to true, which means
+ * it's already catched-up.
+ */
+ private def checkCaughtUp(envelope: IncomingMessageEnvelope) = {
+ systemAdmins match {
+ case null => {
+ warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
+ ssp2catchedupMapping(envelope.getSystemStreamPartition) = true
+ }
+ case others => {
+ val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
+ .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
+ val system = envelope.getSystemStreamPartition.getSystem
+ others(system).offsetComparator(envelope.getOffset, startingOffset) match {
+ case null => {
+ info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
+ ssp2catchedupMapping(envelope.getSystemStreamPartition) = true // not comparable
+ }
+ case result => {
+ if (result >= 0) {
+ info(envelope.getSystemStreamPartition.toString + " is catched up.")
+ ssp2catchedupMapping(envelope.getSystemStreamPartition) = true
+ }
+ }
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index 9dc7051..8b86388 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -31,6 +31,7 @@ class TaskInstanceMetrics(
val commits = newCounter("commit-calls")
val windows = newCounter("window-calls")
val processes = newCounter("process-calls")
+ val messagesActuallyProcessed = newCounter("messages-actually-processed")
val sends = newCounter("send-calls")
val flushes = newCounter("flush-calls")
val messagesSent = newCounter("messages-sent")
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
deleted file mode 100644
index 44e95fc..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.stream
-
-import org.apache.samza.container.TaskName
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being
- * the string representation of the Partition.
- */
-class GroupByPartition extends SystemStreamPartitionGrouper {
- override def group(ssps: util.Set[SystemStreamPartition]) = {
- ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) )
- .map(r => r._1 -> r._2.asJava)
- }
-}
-
-class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory {
- override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
deleted file mode 100644
index 3c0acad..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.stream
-
-import org.apache.samza.container.TaskName
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each
- * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition
- */
-class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper {
- override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava)
-}
-
-class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory {
- override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index c29853d..460d11c 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -142,4 +142,8 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
def createCoordinatorStream(streamName: String) {
throw new UnsupportedOperationException("Method not implemented.")
}
+
+ override def offsetComparator(offset1: String , offset2: String) = {
+ null
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
new file mode 100644
index 0000000..2d6060e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+public class TestTaskConfigJava {
+
+ @Test
+ public void testGetBroadcastSystemStreamPartitions() {
+ HashMap<String, String> map = new HashMap<String, String>();
+ map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14]");
+ Config config = new MapConfig(map);
+
+ TaskConfigJava taskConfig = new TaskConfigJava(config);
+ Set<SystemStreamPartition> systemStreamPartitionSet = taskConfig.getBroadcastSystemStreamPartitions();
+
+ HashSet<SystemStreamPartition> expected = new HashSet<SystemStreamPartition>();
+ expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4)));
+ expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5)));
+ expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(12)));
+ expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(13)));
+ expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(14)));
+ assertEquals(expected, systemStreamPartitionSet);
+
+ map.put("task.broadcast.inputs", "kafka.foo");
+ taskConfig = new TaskConfigJava(new MapConfig(map));
+ boolean catchCorrectException = false;
+ try {
+ taskConfig.getBroadcastSystemStreamPartitions();
+ } catch (IllegalArgumentException e) {
+ catchCorrectException = true;
+ }
+ assertTrue(catchCorrectException);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
new file mode 100644
index 0000000..2a8d447
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+public class TestGroupByPartition {
+ SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
+ SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
+ SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
+ SystemStreamPartition ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1));
+ SystemStreamPartition ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2));
+ SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
+
+ @Test
+ public void testLocalStreamsGroupedCorrectly() {
+ HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+ GroupByPartition grouper = new GroupByPartition();
+ Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs);
+ // empty SSP set gets empty groups
+ assertTrue(emptyResult.isEmpty());
+
+ Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0);
+ Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+ Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+ HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>();
+ partition0.add(aa0);
+ partition0.add(ac0);
+ expectedResult.put(new TaskName("Partition 0"), partition0);
+
+ HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>();
+ partition1.add(aa1);
+ partition1.add(ab1);
+ expectedResult.put(new TaskName("Partition 1"), partition1);
+
+ HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
+ partition2.add(aa2);
+ partition2.add(ab2);
+ expectedResult.put(new TaskName("Partition 2"), partition2);
+
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void testBroadcastStreamsGroupedCorrectly() {
+ HashMap<String, String> configMap = new HashMap<String, String>();
+ configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1");
+ Config config = new MapConfig(configMap);
+ GroupByPartition grouper = new GroupByPartition(config);
+
+ HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+ Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0);
+ Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+
+ Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+ HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>();
+ partition0.add(aa0); // broadcast stream
+ partition0.add(ac0);
+ partition0.add(ab1); // broadcast stream
+ expectedResult.put(new TaskName("Partition 0"), partition0);
+
+ HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>();
+ partition1.add(aa1);
+ partition1.add(ab1); // broadcast stream
+ partition1.add(aa0); // broadcast stream
+ expectedResult.put(new TaskName("Partition 1"), partition1);
+
+ HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
+ partition2.add(aa2);
+ partition2.add(ab2);
+ partition2.add(aa0); // broadcast stream
+ partition2.add(ab1); // broadcast stream
+ expectedResult.put(new TaskName("Partition 2"), partition2);
+
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void testNoTaskOnlyContainsBroadcastStreams() {
+ HashMap<String, String> configMap = new HashMap<String, String>();
+ configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1");
+ Config config = new MapConfig(configMap);
+ GroupByPartition grouper = new GroupByPartition(config);
+
+ HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+ Collections.addAll(allSSPs, aa0, ab1, ab2);
+ Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+
+ Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+ HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
+ partition2.add(aa0); // broadcast stream
+ partition2.add(ab1);
+ partition2.add(ab2); // broadcast stream
+ expectedResult.put(new TaskName("Partition 2"), partition2);
+
+ assertEquals(expectedResult, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
new file mode 100644
index 0000000..1bd14a4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+public class TestGroupBySystemStreamPartition {
+ SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
+ SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
+ SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
+ SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
+
+ @Test
+ public void testLocalStreamGroupedCorrectly() {
+ HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+
+ GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition();
+ Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs);
+ assertTrue(emptyResult.isEmpty());
+
+ Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
+ Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+ Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+ HashSet<SystemStreamPartition> partitionaa0 = new HashSet<SystemStreamPartition>();
+ partitionaa0.add(aa0);
+ expectedResult.put(new TaskName(aa0.toString()), partitionaa0);
+
+ HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>();
+ partitionaa1.add(aa1);
+ expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+
+ HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>();
+ partitionaa2.add(aa2);
+ expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+
+ HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>();
+ partitionac0.add(ac0);
+ expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void testBroadcastStreamGroupedCorrectly() {
+ HashMap<String, String> configMap = new HashMap<String, String>();
+ configMap.put("task.broadcast.inputs", "SystemA.StreamA#0");
+ Config config = new MapConfig(configMap);
+
+ HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+ Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
+ GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition(config);
+ Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+
+ Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+ HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>();
+ partitionaa1.add(aa1);
+ partitionaa1.add(aa0);
+ expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+
+ HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>();
+ partitionaa2.add(aa2);
+ partitionaa2.add(aa0);
+ expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+
+ HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>();
+ partitionac0.add(ac0);
+ partitionac0.add(aa0);
+ expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+
+ assertEquals(expectedResult, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index daa5eab..c00ef91 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -49,9 +49,9 @@ class TestOffsetManager {
val offsetManager = OffsetManager(systemStreamMetadata, config)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
- assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
- assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
- assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+ assertFalse(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).isDefined)
+ assertTrue(offsetManager.getStartingOffset(taskName, systemStreamPartition).isDefined)
+ assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
}
@Test
@@ -73,14 +73,14 @@ class TestOffsetManager {
assertEquals(taskName, checkpointManager.registered.head)
assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(taskName))
// Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
- assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
- assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
- offsetManager.update(systemStreamPartition, "46")
- assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
- offsetManager.update(systemStreamPartition, "47")
- assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+ assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
+ assertEquals("45", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
+ offsetManager.update(taskName, systemStreamPartition, "46")
+ assertEquals("46", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
+ offsetManager.update(taskName, systemStreamPartition, "47")
+ assertEquals("47", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
// Should never update starting offset.
- assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
+ assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
offsetManager.checkpoint(taskName)
val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))
@@ -103,11 +103,11 @@ class TestOffsetManager {
// Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
offsetManager.checkpoint(taskName)
assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
- offsetManager.update(systemStreamPartition, "46")
- offsetManager.update(systemStreamPartition, "47")
+ offsetManager.update(taskName, systemStreamPartition, "46")
+ offsetManager.update(taskName, systemStreamPartition, "47")
offsetManager.checkpoint(taskName)
assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
- offsetManager.update(systemStreamPartition, "48")
+ offsetManager.update(taskName, systemStreamPartition, "48")
offsetManager.checkpoint(taskName)
assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
}
@@ -133,7 +133,7 @@ class TestOffsetManager {
assertEquals(taskName, checkpointManager.registered.head)
assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName))
// Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
- assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+ assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
}
@Test
@@ -223,7 +223,7 @@ class TestOffsetManager {
offsetManager.start
assertTrue(checkpointManager.isStarted)
assertEquals(1, checkpointManager.registered.size)
- assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null))
+ assertNull(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition1).getOrElse(null))
}
@Test
@@ -236,7 +236,7 @@ class TestOffsetManager {
val offsetManager = new OffsetManager(offsetSettings = Map(ssp.getSystemStream -> settings))
offsetManager.register(taskName, Set(ssp))
offsetManager.start
- assertEquals(Some("13"), offsetManager.getStartingOffset(ssp))
+ assertEquals(Some("13"), offsetManager.getStartingOffset(taskName, ssp))
}
@@ -255,10 +255,7 @@ class TestOffsetManager {
override def stop { isStopped = true }
// Only for testing purposes - not present in actual checkpoint manager
- def getOffets: util.Map[SystemStreamPartition, String] =
- {
- checkpoint.getOffsets()
- }
+ def getOffets = Map(taskName -> mapAsScalaMap(checkpoint.getOffsets()).toMap)
}
}
@@ -281,6 +278,8 @@ class TestOffsetManager {
override def createCoordinatorStream(streamName: String) {
new UnsupportedOperationException("Method not implemented.")
}
+
+ override def offsetComparator(offset1: String, offset2: String) = null
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index b9d9e73..b4d6f35 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -20,6 +20,7 @@
package org.apache.samza.container
import org.junit.Test
+import org.junit.Assert._
import org.mockito.Matchers
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
@@ -243,4 +244,19 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
testMetrics.commits.getCount should equal(1L)
testMetrics.windows.getCount should equal(1L)
}
+
+ @Test
+ def testGetSystemStreamPartitionToTaskInstancesMapping {
+ val ti0 = mock[TaskInstance]
+ val ti1 = mock[TaskInstance]
+ val ti2 = mock[TaskInstance]
+ when(ti0.systemStreamPartitions).thenReturn(Set(ssp0))
+ when(ti1.systemStreamPartitions).thenReturn(Set(ssp1))
+ when(ti2.systemStreamPartitions).thenReturn(Set(ssp1))
+
+ val mockTaskInstances = Map(taskName0 -> ti0, taskName1 -> ti1, new TaskName("2") -> ti2)
+ val runLoop = new RunLoop(mockTaskInstances, null, new SamzaContainerMetrics)
+ val expected = Map(ssp0 -> List(ti0), ssp1 -> List(ti1, ti2))
+ assertEquals(expected, runLoop.getSystemStreamPartitionToTaskInstancesMapping)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 4db6d5c..6de8710 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -152,6 +152,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
taskName,
config,
new TaskInstanceMetrics,
+ null,
consumerMultiplexer,
collector,
containerContext
http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 7caad28..5457f0e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -20,7 +20,6 @@
package org.apache.samza.container
import java.util.concurrent.ConcurrentHashMap
-
import org.apache.samza.SamzaException
import org.apache.samza.Partition
import org.apache.samza.checkpoint.OffsetManager
@@ -44,8 +43,9 @@ import org.apache.samza.task._
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.intercept
-
import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemAdmin
+import scala.collection.mutable.ListBuffer
class TestTaskInstance {
@Test
@@ -64,6 +64,7 @@ class TestTaskInstance {
new SerdeManager)
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val systemStreamPartitions = Set(systemStreamPartition)
// Pretend our last checkpointed (next) offset was 2.
val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
@@ -75,15 +76,17 @@ class TestTaskInstance {
taskName,
config,
new TaskInstanceMetrics,
+ null,
consumerMultiplexer,
collector,
containerContext,
- offsetManager)
+ offsetManager,
+ systemStreamPartitions = systemStreamPartitions)
// Pretend we got a message with offset 2 and next offset 3.
val coordinator = new ReadableCoordinator(taskName)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
// Check to see if the offset manager has been properly updated with offset 3.
- val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition)
+ val lastProcessedOffset = offsetManager.getLastProcessedOffset(taskName, systemStreamPartition)
assertTrue(lastProcessedOffset.isDefined)
assertEquals("2", lastProcessedOffset.get)
}
@@ -156,6 +159,7 @@ class TestTaskInstance {
new SerdeManager)
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val systemStreamPartitions = Set(systemStreamPartition)
// Pretend our last checkpointed (next) offset was 2.
val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
@@ -170,10 +174,12 @@ class TestTaskInstance {
taskName,
config,
taskMetrics,
+ null,
consumerMultiplexer,
collector,
containerContext,
offsetManager,
+ systemStreamPartitions = systemStreamPartitions,
exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
val coordinator = new ReadableCoordinator(taskName)
@@ -210,6 +216,7 @@ class TestTaskInstance {
new SerdeManager)
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val systemStreamPartitions = Set(systemStreamPartition)
// Pretend our last checkpointed (next) offset was 2.
val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
@@ -224,10 +231,12 @@ class TestTaskInstance {
taskName,
config,
taskMetrics,
+ null,
consumerMultiplexer,
collector,
containerContext,
offsetManager,
+ systemStreamPartitions = systemStreamPartitions,
exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
val coordinator = new ReadableCoordinator(taskName)
@@ -242,7 +251,6 @@ class TestTaskInstance {
assertEquals(1L, getCount(group, classOf[FatalException].getName))
}
-
/**
* Tests that the init() method of task can override the existing offset
* assignment.
@@ -258,8 +266,7 @@ class TestTaskInstance {
override def init(config: Config, context: TaskContext): Unit = {
assertTrue("Can only update offsets for assigned partition",
- context.getSystemStreamPartitions.contains(partition1)
- )
+ context.getSystemStreamPartitions.contains(partition1))
context.setStartingOffset(partition1, "10")
}
@@ -278,22 +285,85 @@ class TestTaskInstance {
val offsetManager = new OffsetManager()
- offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
+ offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "0")
val taskInstance = new TaskInstance(
task,
taskName,
config,
metrics,
+ null,
consumers,
collector,
containerContext,
offsetManager,
- systemStreamPartitions = Set(partition0, partition1) )
+ systemStreamPartitions = Set(partition0, partition1))
taskInstance.initTask
- assertEquals(Some("0"), offsetManager.getStartingOffset(partition0))
- assertEquals(Some("10"), offsetManager.getStartingOffset(partition1))
+ assertEquals(Some("0"), offsetManager.getStartingOffset(taskName, partition0))
+ assertEquals(Some("10"), offsetManager.getStartingOffset(taskName, partition1))
+ }
+
+ @Test
+ def testIgnoreMessagesOlderThanStartingOffsets {
+ val partition0 = new SystemStreamPartition("system", "stream", new Partition(0))
+ val partition1 = new SystemStreamPartition("system", "stream", new Partition(1))
+ val config = new MapConfig()
+ val chooser = new RoundRobinChooser()
+ val consumers = new SystemConsumers(chooser, consumers = Map.empty)
+ val producers = new SystemProducers(Map.empty, new SerdeManager())
+ val metrics = new TaskInstanceMetrics()
+ val taskName = new TaskName("testing")
+ val collector = new TaskInstanceCollector(producers)
+ val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+ val offsetManager = new OffsetManager()
+ offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
+ val systemAdmins = Map("system" -> new MockSystemAdmin)
+ var result = new ListBuffer[IncomingMessageEnvelope]
+
+ val task = new StreamTask {
+ def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+ result += envelope
+ }
+ }
+
+ val taskInstance = new TaskInstance(
+ task,
+ taskName,
+ config,
+ metrics,
+ systemAdmins,
+ consumers,
+ collector,
+ containerContext,
+ offsetManager,
+ systemStreamPartitions = Set(partition0, partition1))
+
+ val coordinator = new ReadableCoordinator(taskName)
+ val envelope1 = new IncomingMessageEnvelope(partition0, "1", null, null)
+ val envelope2 = new IncomingMessageEnvelope(partition0, "2", null, null)
+ val envelope3 = new IncomingMessageEnvelope(partition1, "1", null, null)
+ val envelope4 = new IncomingMessageEnvelope(partition1, "102", null, null)
+
+ taskInstance.process(envelope1, coordinator)
+ taskInstance.process(envelope2, coordinator)
+ taskInstance.process(envelope3, coordinator)
+ taskInstance.process(envelope4, coordinator)
+
+ val expected = List(envelope1, envelope2, envelope4)
+ assertEquals(expected, result.toList)
+ }
+}
+
+class MockSystemAdmin extends SystemAdmin {
+ override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { offsets }
+ override def getSystemStreamMetadata(streamNames: java.util.Set[String]) = null
+ override def createCoordinatorStream(stream: String) = {}
+ override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {}
+ override def validateChangelogStream(topicName: String, numOfPartitions: Int) = {}
+
+ override def offsetComparator(offset1: String, offset2: String) = {
+ offset1.toLong compare offset2.toLong
}
}