You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/09/05 21:50:37 UTC
[pulsar] branch master updated: Convert latency to millisecond in
producer stats (#5096)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 79ef292 Convert latency to millisecond in producer stats (#5096)
79ef292 is described below
commit 79ef292c943d08ed7133890f9b33232e9669d529
Author: Like <ke...@outlook.com>
AuthorDate: Fri Sep 6 05:50:32 2019 +0800
Convert latency to millisecond in producer stats (#5096)
* Convert latency to millisecond in producer stats
* Format header
---
.../client/impl/ProducerStatsRecorderImpl.java | 2 +-
.../client/impl/ProducerStatsRecorderImplTest.java | 57 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 25cb724..fde2d75 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -191,7 +191,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder {
public void incrementNumAcksReceived(long latencyNs) {
numAcksReceived.increment();
synchronized (ds) {
- ds.update(TimeUnit.NANOSECONDS.toMicros(latencyNs));
+ ds.update(TimeUnit.NANOSECONDS.toMillis(latencyNs));
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
new file mode 100644
index 0000000..d654158
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit tests of {@link ProducerStatsRecorderImpl}.
+ */
+public class ProducerStatsRecorderImplTest {
+
+ @Test
+ public void testIncrementNumAcksReceived() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setStatsIntervalSeconds(1);
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ when(client.getConfiguration()).thenReturn(conf);
+ Timer timer = new HashedWheelTimer();
+ when(client.timer()).thenReturn(timer);
+ ProducerImpl<?> producer = mock(ProducerImpl.class);
+ when(producer.getTopic()).thenReturn("topic-test");
+ when(producer.getProducerName()).thenReturn("producer-test");
+ when(producer.getPendingQueueSize()).thenReturn(1);
+ ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
+ ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(client, producerConfigurationData, producer);
+ long latencyNs = TimeUnit.SECONDS.toNanos(1);
+ recorder.incrementNumAcksReceived(latencyNs);
+ Thread.sleep(1200);
+ assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
+ }
+}