You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/03/08 09:17:56 UTC

[rocketmq] branch develop updated: [RIP-10] Add test cases for ConsumerRunningInfo (#923)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 02e2abb  [RIP-10] Add test cases for ConsumerRunningInfo (#923)
02e2abb is described below

commit 02e2abb5eecf76ea2cd5ab2d30a6927441c60067
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Fri Mar 8 17:17:51 2019 +0800

    [RIP-10] Add test cases for ConsumerRunningInfo (#923)
---
 .../protocol/body/ConsumerRunningInfoTest.java     | 108 +++++++++++++++++++++
 1 file changed, 108 insertions(+)

diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfoTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfoTest.java
new file mode 100644
index 0000000..b371893
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfoTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.rocketmq.common.protocol.heartbeat.ConsumeType.CONSUME_ACTIVELY;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumerRunningInfoTest {
+
+    private ConsumerRunningInfo consumerRunningInfo;
+
+    private TreeMap<String, ConsumerRunningInfo> criTable;
+
+    private MessageQueue messageQueue;
+
+    @Before
+    public void init() {
+        consumerRunningInfo = new ConsumerRunningInfo();
+        consumerRunningInfo.setJstack("test");
+
+        TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
+        messageQueue = new MessageQueue("topicA","broker", 1);
+        mqTable.put(messageQueue, new ProcessQueueInfo());
+        consumerRunningInfo.setMqTable(mqTable);
+
+        TreeMap<String, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
+        statusTable.put("topicA", new ConsumeStatus());
+        consumerRunningInfo.setStatusTable(statusTable);
+
+        TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
+        subscriptionSet.add(new SubscriptionData());
+        consumerRunningInfo.setSubscriptionSet(subscriptionSet);
+
+        Properties properties = new Properties();
+        properties.put(ConsumerRunningInfo.PROP_CONSUME_TYPE, CONSUME_ACTIVELY);
+        properties.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, System.currentTimeMillis());
+        consumerRunningInfo.setProperties(properties);
+
+        criTable = new TreeMap<String, ConsumerRunningInfo>();
+        criTable.put("client_id", consumerRunningInfo);
+    }
+
+    @Test
+    public void testFromJson() {
+        String toJson = RemotingSerializable.toJson(consumerRunningInfo, true);
+        ConsumerRunningInfo fromJson = RemotingSerializable.fromJson(toJson, ConsumerRunningInfo.class);
+
+        assertThat(fromJson.getJstack()).isEqualTo("test");
+        assertThat(fromJson.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).isEqualTo(ConsumeType.CONSUME_ACTIVELY.name());
+
+        ConsumeStatus consumeStatus = fromJson.getStatusTable().get("topicA");
+        assertThat(consumeStatus).isExactlyInstanceOf(ConsumeStatus.class);
+
+        SubscriptionData subscription = fromJson.getSubscriptionSet().first();
+        assertThat(subscription).isExactlyInstanceOf(SubscriptionData.class);
+
+        ProcessQueueInfo processQueueInfo = fromJson.getMqTable().get(messageQueue);
+        assertThat(processQueueInfo).isExactlyInstanceOf(ProcessQueueInfo.class);
+    }
+
+    @Test
+    public void testAnalyzeRebalance(){
+        boolean result = ConsumerRunningInfo.analyzeRebalance(criTable);
+        assertThat(result).isTrue();
+    }
+
+    @Test
+    public void testAnalyzeProcessQueue(){
+        String result = ConsumerRunningInfo.analyzeProcessQueue("client_id", consumerRunningInfo);
+        assertThat(result).isEmpty();
+
+    }
+
+    @Test
+    public void testAnalyzeSubscription(){
+        boolean result = ConsumerRunningInfo.analyzeSubscription(criTable);
+        assertThat(result).isTrue();
+    }
+
+
+}