You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/23 18:28:43 UTC
git commit: SAMZA-248; add producer support to test performance task
Repository: incubator-samza
Updated Branches:
refs/heads/master 06b3698d7 -> ac4feb22f
SAMZA-248; add producer support to test performance task
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ac4feb22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ac4feb22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ac4feb22
Branch: refs/heads/master
Commit: ac4feb22fac7b8a082facd5b27bdf86d9144310e
Parents: 06b3698
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Apr 23 09:28:32 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Apr 23 09:28:32 2014 -0700
----------------------------------------------------------------------
.../test/performance/TestPerformanceTask.scala | 56 ++++++++++++++++++--
1 file changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ac4feb22/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 12c5259..49dffa1 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.samza.test.performance
import org.apache.samza.task.TaskContext
@@ -8,8 +27,13 @@ import org.apache.samza.task.StreamTask
import org.apache.samza.task.TaskCoordinator
import org.apache.samza.config.Config
import grizzled.slf4j.Logging
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.util.Util
object TestPerformanceTask {
+ // No thread safety is needed for these variables because they're mutated in
+ // the process method, which is single threaded.
var messagesProcessed = 0
var startTime = 0L
}
@@ -20,10 +44,23 @@ object TestPerformanceTask {
* stored statically, so that all tasks in a single SamzaContainer increment
* the same counter.
*
- * The log interval is configured with task.log.interval, which defines how many
- * messages to process before printing a log line. The task will continue running
- * until task.max.messages have been processed, at which point it will shut
- * itself down.
+ * The log interval is configured with task.log.interval, which defines how
+ * many messages to process before printing a log line. The task will continue
+ * running until task.max.messages have been processed, at which point it will
+ * shut itself down.
+ *
+ * This task can also be configured to take incoming messages, and send them
+ * to an output stream. If the task is configured to do this, the outgoing
+ * message will have the same key and value as the incoming message. The
+ * output stream is configured with task.outputs=[system].[stream]. For
+ * example:
+ *
+ * <pre>
+ * task.outputs=kafka.MyOutputTopic
+ * <pre>
+ *
+ * If undefined, the task simply drops incoming messages, rather than
+ * forwarding them to the output stream.
*/
class TestPerformanceTask extends StreamTask with InitableTask with Logging {
import TestPerformanceTask._
@@ -38,9 +75,16 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
*/
var maxMessages = 100000
+ /**
+ * If defined, incoming messages will be forwarded to this SystemStream. If
+ * undefined, the task will not output messages.
+ */
+ var outputSystemStream: Option[SystemStream] = None
+
def init(config: Config, context: TaskContext) {
logInterval = config.getInt("task.log.interval", 10000)
maxMessages = config.getInt("task.max.messages", 100000)
+ outputSystemStream = Option(config.get("task.outputs", null)).map(Util.getSystemStreamFromNames(_))
}
def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
@@ -48,6 +92,10 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
startTime = System.currentTimeMillis
}
+ if (outputSystemStream.isDefined) {
+ collector.send(new OutgoingMessageEnvelope(outputSystemStream.get, envelope.getKey, envelope.getMessage))
+ }
+
messagesProcessed += 1
if (messagesProcessed % logInterval == 0) {