You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/23 18:28:43 UTC

git commit: SAMZA-248; add producer support to test performance task

Repository: incubator-samza
Updated Branches:
  refs/heads/master 06b3698d7 -> ac4feb22f


SAMZA-248; add producer support to test performance task


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ac4feb22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ac4feb22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ac4feb22

Branch: refs/heads/master
Commit: ac4feb22fac7b8a082facd5b27bdf86d9144310e
Parents: 06b3698
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Apr 23 09:28:32 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Apr 23 09:28:32 2014 -0700

----------------------------------------------------------------------
 .../test/performance/TestPerformanceTask.scala  | 56 ++++++++++++++++++--
 1 file changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ac4feb22/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 12c5259..49dffa1 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.samza.test.performance
 
 import org.apache.samza.task.TaskContext
@@ -8,8 +27,13 @@ import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.config.Config
 import grizzled.slf4j.Logging
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.util.Util
 
 object TestPerformanceTask {
+  // No thread safety is needed for these variables because they're mutated in 
+  // the process method, which is single threaded.
   var messagesProcessed = 0
   var startTime = 0L
 }
@@ -20,10 +44,23 @@ object TestPerformanceTask {
  * stored statically, so that all tasks in a single SamzaContainer increment
  * the same counter.
  *
- * The log interval is configured with task.log.interval, which defines how many
- * messages to process before printing a log line. The task will continue running
- * until task.max.messages have been processed, at which point it will shut
- * itself down.
+ * The log interval is configured with task.log.interval, which defines how
+ * many messages to process before printing a log line. The task will continue
+ * running until task.max.messages have been processed, at which point it will
+ * shut itself down.
+ *
+ * This task can also be configured to take incoming messages, and send them
+ * to an output stream. If the task is configured to do this, the outgoing
+ * message will have the same key and value as the incoming message. The
+ * output stream is configured with task.outputs=[system].[stream]. For
+ * example:
+ *
+ * <pre>
+ *   task.outputs=kafka.MyOutputTopic
+ * <pre>
+ * 
+ * If undefined, the task simply drops incoming messages, rather than
+ * forwarding them to the output stream.
  */
 class TestPerformanceTask extends StreamTask with InitableTask with Logging {
   import TestPerformanceTask._
@@ -38,9 +75,16 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
    */
   var maxMessages = 100000
 
+  /**
+   * If defined, incoming messages will be forwarded to this SystemStream. If
+   * undefined, the task will not output messages.
+   */
+  var outputSystemStream: Option[SystemStream] = None
+
   def init(config: Config, context: TaskContext) {
     logInterval = config.getInt("task.log.interval", 10000)
     maxMessages = config.getInt("task.max.messages", 100000)
+    outputSystemStream = Option(config.get("task.outputs", null)).map(Util.getSystemStreamFromNames(_))
   }
 
   def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
@@ -48,6 +92,10 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
       startTime = System.currentTimeMillis
     }
 
+    if (outputSystemStream.isDefined) {
+      collector.send(new OutgoingMessageEnvelope(outputSystemStream.get, envelope.getKey, envelope.getMessage))
+    }
+
     messagesProcessed += 1
 
     if (messagesProcessed % logInterval == 0) {