You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/22 22:17:16 UTC
git commit: SAMZA-247;
move test performance task into samza-test main jar
Repository: incubator-samza
Updated Branches:
refs/heads/master bbb0b1235 -> 73d604c43
SAMZA-247; move test performance task into samza-test main jar
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/73d604c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/73d604c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/73d604c4
Branch: refs/heads/master
Commit: 73d604c43812aa30a079bfaf278bcfed10579191
Parents: bbb0b12
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Apr 22 13:17:08 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Apr 22 13:17:08 2014 -0700
----------------------------------------------------------------------
.../test/performance/TestPerformanceTask.scala | 62 ++++++++++++++++++++
.../TestSamzaContainerPerformance.scala | 52 ----------------
2 files changed, 62 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/73d604c4/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
new file mode 100644
index 0000000..12c5259
--- /dev/null
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -0,0 +1,62 @@
+package org.apache.samza.test.performance
+
+import org.apache.samza.task.TaskContext
+import org.apache.samza.task.InitableTask
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.config.Config
+import grizzled.slf4j.Logging
+
+object TestPerformanceTask {
+ var messagesProcessed = 0
+ var startTime = 0L
+}
+
+/**
+ * A little test task that prints how many messages a SamzaContainer has
+ * received, and over what period of time. The messages-processed count is
+ * stored statically, so that all tasks in a single SamzaContainer increment
+ * the same counter.
+ *
+ * The log interval is configured with task.log.interval, which defines how many
+ * messages to process before printing a log line. The task will continue running
+ * until task.max.messages have been processed, at which point it will shut
+ * itself down.
+ */
+class TestPerformanceTask extends StreamTask with InitableTask with Logging {
+ import TestPerformanceTask._
+
+ /**
+ * How many messages to process before a log message is printed.
+ */
+ var logInterval = 10000
+
+ /**
+ * How many messages to process before shutting down.
+ */
+ var maxMessages = 100000
+
+ def init(config: Config, context: TaskContext) {
+ logInterval = config.getInt("task.log.interval", 10000)
+ maxMessages = config.getInt("task.max.messages", 100000)
+ }
+
+ def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+ if (startTime == 0) {
+ startTime = System.currentTimeMillis
+ }
+
+ messagesProcessed += 1
+
+ if (messagesProcessed % logInterval == 0) {
+ val seconds = (System.currentTimeMillis - startTime) / 1000
+ info("Processed %s messages in %s seconds." format (messagesProcessed, seconds))
+ }
+
+ if (messagesProcessed >= maxMessages) {
+ coordinator.shutdown
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/73d604c4/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
index 3dc2630..4016768 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
@@ -102,55 +102,3 @@ class TestSamzaContainerPerformance extends Logging{
job.waitForFinish(Int.MaxValue)
}
}
-
-object TestPerformanceTask {
- var messagesProcessed = 0
- var startTime = 0L
-}
-
-/**
- * A little test task that prints how many messages a SamzaContainer has
- * received, and over what period of time. The messages-processed count is
- * stored statically, so that all tasks in a single SamzaContainer increment
- * the same counter.
- *
- * The log interval is configured with task.log.interval, which defines how many
- * messages to process before printing a log line. The task will continue running
- * until task.max.messages have been processed, at which point it will shut
- * itself down.
- */
-class TestPerformanceTask extends StreamTask with InitableTask with Logging {
- import TestPerformanceTask._
-
- /**
- * How many messages to process before a log message is printed.
- */
- var logInterval = 10000
-
- /**
- * How many messages to process before shutting down.
- */
- var maxMessages = 100000
-
- def init(config: Config, context: TaskContext) {
- logInterval = config.getInt("task.log.interval", 10000)
- maxMessages = config.getInt("task.max.messages", 100000)
- }
-
- def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
- if (startTime == 0) {
- startTime = System.currentTimeMillis
- }
-
- messagesProcessed += 1
-
- if (messagesProcessed % logInterval == 0) {
- val seconds = (System.currentTimeMillis - startTime) / 1000
- info("Processed %s messages in %s seconds." format (messagesProcessed, seconds))
- }
-
- if (messagesProcessed >= maxMessages) {
- coordinator.shutdown
- }
- }
-}
\ No newline at end of file