You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/19 22:06:29 UTC

[samza] branch master updated: SAMZA-2133: Fix the TestKeyValuePerformance test. (#959)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e2cabc  SAMZA-2133: Fix the TestKeyValuePerformance test. (#959)
4e2cabc is described below

commit 4e2cabc1e5e46f67f666e09ac3d8146041aaff4c
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Tue Mar 19 15:06:25 2019 -0700

    SAMZA-2133: Fix the TestKeyValuePerformance test. (#959)
---
 .../test/performance/TestKeyValuePerformance.scala | 40 +++++++++++++++++-----
 1 file changed, 31 insertions(+), 9 deletions(-)

diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 9c7657d..69811b0 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -20,24 +20,42 @@
 package org.apache.samza.test.performance
 
 import java.io.File
+import java.util
 import java.util.concurrent.TimeUnit
-import java.util.{Collections, UUID}
+import java.util.Collections
+import java.util.UUID
 
 import com.google.common.base.Stopwatch
+import com.google.common.collect.ImmutableList
+import com.google.common.collect.ImmutableMap
+import org.apache.commons.lang.RandomStringUtils
 import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig
+import org.apache.samza.config.MapConfig
 import org.apache.samza.config.StorageConfig._
 import org.apache.samza.container.TaskName
-import org.apache.samza.context.{ContainerContextImpl, JobContextImpl}
-import org.apache.samza.job.model.{ContainerModel, TaskModel}
+import org.apache.samza.context.ContainerContextImpl
+import org.apache.samza.context.JobContextImpl
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{ByteSerde, SerdeManager, UUIDSerde}
+import org.apache.samza.serializers.ByteSerde
+import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.serializers.UUIDSerde
 import org.apache.samza.storage.StorageEngineFactory
 import org.apache.samza.storage.StorageEngineFactory.StoreMode
-import org.apache.samza.storage.kv.{KeyValueStorageEngine, KeyValueStore}
-import org.apache.samza.system.{SystemProducer, SystemProducers, SystemStreamPartition}
+import org.apache.samza.storage.kv.KeyValueStorageEngine
+import org.apache.samza.storage.kv.KeyValueStore
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.{CommandLine, FileUtil, Logging, Util}
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.util.CommandLine
+import org.apache.samza.util.FileUtil
+import org.apache.samza.util.Logging
+import org.apache.samza.util. Util
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
 
 import scala.collection.JavaConverters._
 import scala.util.Random
@@ -60,6 +78,7 @@ import scala.util.Random
 
 object TestKeyValuePerformance extends Logging {
   val Encoding = "UTF-8"
+  val JobId = RandomStringUtils.random(10)
 
   val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map(
     "all-with-deletes" -> runTestAllWithDeletes,
@@ -77,7 +96,10 @@ object TestKeyValuePerformance extends Logging {
     tests.foreach{ test =>
       info("Running test: %s" format test)
       if(testMethods.contains(test)) {
-        invokeTest(test, testMethods(test), config.subset("test." + test + ".", true))
+        val testConfig: util.Map[String, String] = new MapConfig(config.subset("test." + test + ".", true))
+        val jobConfig: util.Map[String, String] = ImmutableMap.of(JobConfig.JOB_NAME, test, JobConfig.JOB_ID, JobId)
+        val combinedConfig: Config = new MapConfig(ImmutableList.of(testConfig, jobConfig))
+        invokeTest(test, testMethods(test), combinedConfig)
       } else {
         error("Invalid test method. valid methods are: %s" format testMethods.keys)
         throw new SamzaException("Unknown test method: %s" format test)