You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/22 22:17:16 UTC

git commit: SAMZA-247; move test performance task into samza-test main jar

Repository: incubator-samza
Updated Branches:
  refs/heads/master bbb0b1235 -> 73d604c43


SAMZA-247; move test performance task into samza-test main jar


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/73d604c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/73d604c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/73d604c4

Branch: refs/heads/master
Commit: 73d604c43812aa30a079bfaf278bcfed10579191
Parents: bbb0b12
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Apr 22 13:17:08 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Apr 22 13:17:08 2014 -0700

----------------------------------------------------------------------
 .../test/performance/TestPerformanceTask.scala  | 62 ++++++++++++++++++++
 .../TestSamzaContainerPerformance.scala         | 52 ----------------
 2 files changed, 62 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/73d604c4/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
new file mode 100644
index 0000000..12c5259
--- /dev/null
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -0,0 +1,62 @@
+package org.apache.samza.test.performance
+
+import org.apache.samza.task.TaskContext
+import org.apache.samza.task.InitableTask
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.config.Config
+import grizzled.slf4j.Logging
+
+object TestPerformanceTask {
+  var messagesProcessed = 0
+  var startTime = 0L
+}
+
+/**
+ * A little test task that prints how many messages a SamzaContainer has
+ * received, and over what period of time. The messages-processed count is
+ * stored statically, so that all tasks in a single SamzaContainer increment
+ * the same counter.
+ *
+ * The log interval is configured with task.log.interval, which defines how many
+ * messages to process before printing a log line. The task will continue running
+ * until task.max.messages have been processed, at which point it will shut
+ * itself down.
+ */
+class TestPerformanceTask extends StreamTask with InitableTask with Logging {
+  import TestPerformanceTask._
+
+  /**
+   * How many messages to process before a log message is printed.
+   */
+  var logInterval = 10000
+
+  /**
+   * How many messages to process before shutting down.
+   */
+  var maxMessages = 100000
+
+  def init(config: Config, context: TaskContext) {
+    logInterval = config.getInt("task.log.interval", 10000)
+    maxMessages = config.getInt("task.max.messages", 100000)
+  }
+
+  def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+    if (startTime == 0) {
+      startTime = System.currentTimeMillis
+    }
+
+    messagesProcessed += 1
+
+    if (messagesProcessed % logInterval == 0) {
+      val seconds = (System.currentTimeMillis - startTime) / 1000
+      info("Processed %s messages in %s seconds." format (messagesProcessed, seconds))
+    }
+
+    if (messagesProcessed >= maxMessages) {
+      coordinator.shutdown
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/73d604c4/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
index 3dc2630..4016768 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
@@ -102,55 +102,3 @@ class TestSamzaContainerPerformance extends Logging{
     job.waitForFinish(Int.MaxValue)
   }
 }
-
-object TestPerformanceTask {
-  var messagesProcessed = 0
-  var startTime = 0L
-}
-
-/**
- * A little test task that prints how many messages a SamzaContainer has
- * received, and over what period of time. The messages-processed count is
- * stored statically, so that all tasks in a single SamzaContainer increment
- * the same counter.
- *
- * The log interval is configured with task.log.interval, which defines how many
- * messages to process before printing a log line. The task will continue running
- * until task.max.messages have been processed, at which point it will shut
- * itself down.
- */
-class TestPerformanceTask extends StreamTask with InitableTask with Logging {
-  import TestPerformanceTask._
-
-  /**
-   * How many messages to process before a log message is printed.
-   */
-  var logInterval = 10000
-
-  /**
-   * How many messages to process before shutting down.
-   */
-  var maxMessages = 100000
-
-  def init(config: Config, context: TaskContext) {
-    logInterval = config.getInt("task.log.interval", 10000)
-    maxMessages = config.getInt("task.max.messages", 100000)
-  }
-
-  def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
-    if (startTime == 0) {
-      startTime = System.currentTimeMillis
-    }
-
-    messagesProcessed += 1
-
-    if (messagesProcessed % logInterval == 0) {
-      val seconds = (System.currentTimeMillis - startTime) / 1000
-      info("Processed %s messages in %s seconds." format (messagesProcessed, seconds))
-    }
-
-    if (messagesProcessed >= maxMessages) {
-      coordinator.shutdown
-    }
-  }
-}
\ No newline at end of file