You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/05/09 00:19:40 UTC

samza git commit: SAMZA-1196; Fix TestJmxReporter

Repository: samza
Updated Branches:
  refs/heads/master 29c4844fd -> 9d66772d1


SAMZA-1196; Fix TestJmxReporter

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #140 from shanthoosh/fix-test-jmx-reporter


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d66772d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d66772d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d66772d

Branch: refs/heads/master
Commit: 9d66772d155842cf92f1de859b9e815db6a40fc8
Parents: 29c4844
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Mon May 8 17:19:36 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon May 8 17:19:36 2017 -0700

----------------------------------------------------------------------
 .../samza/metrics/reporter/JmxReporter.scala    |  2 +-
 .../metrics/reporter/TestJmxReporter.scala      | 80 ++++++++------------
 2 files changed, 33 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9d66772d/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index ed401de..7da8a9c 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -88,7 +88,7 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
   def registerBean(bean: MetricMBean) {
     if (!server.isRegistered(bean.objectName)) {
       debug("Registering MBean for %s." format bean.objectName)
-      server.registerMBean(bean, bean.objectName);
+      server.registerMBean(bean, bean.objectName)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d66772d/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
index 431e14c..9453524 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -19,65 +19,49 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.junit.Assert._
-import org.junit.AfterClass
-import org.junit.BeforeClass
-import org.junit.Test
-import org.apache.samza.task.TaskContext
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.config.MapConfig
-import org.apache.samza.metrics.JvmMetrics
-
-import java.lang.management.ManagementFactory
-import java.rmi.registry.LocateRegistry
-
+import javax.management.MBeanServer
 import javax.management.ObjectName
-import javax.management.remote.JMXServiceURL
-import javax.management.remote.JMXConnectorServerFactory
-import javax.management.remote.JMXConnectorServer
-import javax.management.remote.JMXConnectorFactory
+import junit.framework.Assert.assertEquals
+import org.apache.samza.metrics.JmxUtil
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.junit.Test
+import org.mockito.Matchers
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.times
+import org.mockito.Mockito.verify
+import org.mockito.Mockito.when
+import org.mockito.Mockito.reset
 
-import scala.collection.JavaConverters._
+class TestJmxReporter {
 
-object TestJmxReporter {
-  val port = 4500
-  val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port)
-  var cs: JMXConnectorServer = null
+  val REPORTER_SOURCE = "test"
 
-  @BeforeClass
-  def beforeSetupServers {
-    LocateRegistry.createRegistry(port)
-    val mbs = ManagementFactory.getPlatformMBeanServer()
-    cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs)
-    cs.start
-  }
+  @Test
+  def testJmxReporter {
+    val metricGroup = "org.apache.samza.metrics.JvmMetrics"
+    val metricName = "mem-non-heap-used-mb"
+    val objectName: ObjectName = JmxUtil.getObjectName(metricGroup, metricName, REPORTER_SOURCE)
 
-  @AfterClass
-  def afterCleanLogDirs {
-    if (cs != null) {
-      cs.stop
-    }
-  }
-}
+    val registry: MetricsRegistryMap = new MetricsRegistryMap
+    val mBeanServerMock: MBeanServer = mock(classOf[MBeanServer])
 
-class TestJmxReporter {
-  import TestJmxReporter.url
+    // Create dummy test metric.
+    registry.newCounter(metricGroup, metricName)
 
-  // TODO: Fix in SAMZA-1196
-  //@Test
-  def testJmxReporter {
-    val registry = new MetricsRegistryMap
-    val jvm = new JvmMetrics(registry)
-    val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]().asJava))
+    when(mBeanServerMock.isRegistered(objectName)).thenReturn(false)
 
-    reporter.register("test", registry)
+    val reporter = new JmxReporter(mBeanServerMock)
+    reporter.register(REPORTER_SOURCE, registry)
     reporter.start
-    jvm.run
 
-    val mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection
-    val stateViaJMX = mbserver.getAttribute(new ObjectName("org.apache.samza.metrics.JvmMetrics:type=test,name=mem-non-heap-used-mb"), "Value").asInstanceOf[Float]
+    verify(mBeanServerMock, times(1)).registerMBean(Matchers.anyObject(), Matchers.eq(objectName))
+
+    reset(mBeanServerMock)
+    // Create dummy counter to test metrics reporting through listener.
+    registry.newCounter(metricGroup, metricName)
 
-    assertTrue(stateViaJMX > 0)
+    assertEquals(1, registry.listeners.size)
+    verify(mBeanServerMock, times(1)).registerMBean(Matchers.anyObject(), Matchers.eq(objectName))
 
     reporter.stop
   }