You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/09/05 21:50:37 UTC

[pulsar] branch master updated: Convert latency to millisecond in producer stats (#5096)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79ef292  Convert latency to millisecond in producer stats (#5096)
79ef292 is described below

commit 79ef292c943d08ed7133890f9b33232e9669d529
Author: Like <ke...@outlook.com>
AuthorDate: Fri Sep 6 05:50:32 2019 +0800

    Convert latency to millisecond in producer stats (#5096)
    
    * Convert latency to millisecond in producer stats
    
    * Format header
---
 .../client/impl/ProducerStatsRecorderImpl.java     |  2 +-
 .../client/impl/ProducerStatsRecorderImplTest.java | 57 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 25cb724..fde2d75 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -191,7 +191,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
     public void incrementNumAcksReceived(long latencyNs) {
         numAcksReceived.increment();
         synchronized (ds) {
-            ds.update(TimeUnit.NANOSECONDS.toMicros(latencyNs));
+            ds.update(TimeUnit.NANOSECONDS.toMillis(latencyNs));
         }
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
new file mode 100644
index 0000000..d654158
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit tests of {@link ProducerStatsRecorderImpl}.
+ */
+public class ProducerStatsRecorderImplTest {
+
+    @Test
+    public void testIncrementNumAcksReceived() throws Exception {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setStatsIntervalSeconds(1);
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        when(client.getConfiguration()).thenReturn(conf);
+        Timer timer = new HashedWheelTimer();
+        when(client.timer()).thenReturn(timer);
+        ProducerImpl<?> producer = mock(ProducerImpl.class);
+        when(producer.getTopic()).thenReturn("topic-test");
+        when(producer.getProducerName()).thenReturn("producer-test");
+        when(producer.getPendingQueueSize()).thenReturn(1);
+        ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
+        ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(client, producerConfigurationData, producer);
+        long latencyNs = TimeUnit.SECONDS.toNanos(1);
+        recorder.incrementNumAcksReceived(latencyNs);
+        Thread.sleep(1200);
+        assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
+    }
+}