You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/19 22:06:29 UTC
[samza] branch master updated: SAMZA-2133: Fix the
TestKeyValuePerformance test. (#959)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 4e2cabc SAMZA-2133: Fix the TestKeyValuePerformance test. (#959)
4e2cabc is described below
commit 4e2cabc1e5e46f67f666e09ac3d8146041aaff4c
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Tue Mar 19 15:06:25 2019 -0700
SAMZA-2133: Fix the TestKeyValuePerformance test. (#959)
---
.../test/performance/TestKeyValuePerformance.scala | 40 +++++++++++++++++-----
1 file changed, 31 insertions(+), 9 deletions(-)
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 9c7657d..69811b0 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -20,24 +20,42 @@
package org.apache.samza.test.performance
import java.io.File
+import java.util
import java.util.concurrent.TimeUnit
-import java.util.{Collections, UUID}
+import java.util.Collections
+import java.util.UUID
import com.google.common.base.Stopwatch
+import com.google.common.collect.ImmutableList
+import com.google.common.collect.ImmutableMap
+import org.apache.commons.lang.RandomStringUtils
import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig
+import org.apache.samza.config.MapConfig
import org.apache.samza.config.StorageConfig._
import org.apache.samza.container.TaskName
-import org.apache.samza.context.{ContainerContextImpl, JobContextImpl}
-import org.apache.samza.job.model.{ContainerModel, TaskModel}
+import org.apache.samza.context.ContainerContextImpl
+import org.apache.samza.context.JobContextImpl
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{ByteSerde, SerdeManager, UUIDSerde}
+import org.apache.samza.serializers.ByteSerde
+import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.serializers.UUIDSerde
import org.apache.samza.storage.StorageEngineFactory
import org.apache.samza.storage.StorageEngineFactory.StoreMode
-import org.apache.samza.storage.kv.{KeyValueStorageEngine, KeyValueStore}
-import org.apache.samza.system.{SystemProducer, SystemProducers, SystemStreamPartition}
+import org.apache.samza.storage.kv.KeyValueStorageEngine
+import org.apache.samza.storage.kv.KeyValueStore
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.{CommandLine, FileUtil, Logging, Util}
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.util.CommandLine
+import org.apache.samza.util.FileUtil
+import org.apache.samza.util.Logging
+import org.apache.samza.util. Util
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
import scala.collection.JavaConverters._
import scala.util.Random
@@ -60,6 +78,7 @@ import scala.util.Random
object TestKeyValuePerformance extends Logging {
val Encoding = "UTF-8"
+ val JobId = RandomStringUtils.random(10)
val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map(
"all-with-deletes" -> runTestAllWithDeletes,
@@ -77,7 +96,10 @@ object TestKeyValuePerformance extends Logging {
tests.foreach{ test =>
info("Running test: %s" format test)
if(testMethods.contains(test)) {
- invokeTest(test, testMethods(test), config.subset("test." + test + ".", true))
+ val testConfig: util.Map[String, String] = new MapConfig(config.subset("test." + test + ".", true))
+ val jobConfig: util.Map[String, String] = ImmutableMap.of(JobConfig.JOB_NAME, test, JobConfig.JOB_ID, JobId)
+ val combinedConfig: Config = new MapConfig(ImmutableList.of(testConfig, jobConfig))
+ invokeTest(test, testMethods(test), combinedConfig)
} else {
error("Invalid test method. valid methods are: %s" format testMethods.keys)
throw new SamzaException("Unknown test method: %s" format test)