You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:09 UTC
[13/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
deleted file mode 100644
index 72e1b96..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
- */
-package com.alibaba.rocketmq.common.protocol.route;
-
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class TopicRouteData extends RemotingSerializable {
- private String orderTopicConf;
- private List<QueueData> queueDatas;
- private List<BrokerData> brokerDatas;
- private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-
-
- public TopicRouteData cloneTopicRouteData() {
- TopicRouteData topicRouteData = new TopicRouteData();
- topicRouteData.setQueueDatas(new ArrayList<QueueData>());
- topicRouteData.setBrokerDatas(new ArrayList<BrokerData>());
- topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
- topicRouteData.setOrderTopicConf(this.orderTopicConf);
-
- if (this.queueDatas != null) {
- topicRouteData.getQueueDatas().addAll(this.queueDatas);
- }
-
- if (this.brokerDatas != null) {
- topicRouteData.getBrokerDatas().addAll(this.brokerDatas);
- }
-
- if (this.filterServerTable != null) {
- topicRouteData.getFilterServerTable().putAll(this.filterServerTable);
- }
-
- return topicRouteData;
- }
-
-
- public List<QueueData> getQueueDatas() {
- return queueDatas;
- }
-
-
- public void setQueueDatas(List<QueueData> queueDatas) {
- this.queueDatas = queueDatas;
- }
-
-
- public List<BrokerData> getBrokerDatas() {
- return brokerDatas;
- }
-
-
- public void setBrokerDatas(List<BrokerData> brokerDatas) {
- this.brokerDatas = brokerDatas;
- }
-
- public HashMap<String, List<String>> getFilterServerTable() {
- return filterServerTable;
- }
-
- public void setFilterServerTable(HashMap<String, List<String>> filterServerTable) {
- this.filterServerTable = filterServerTable;
- }
-
- public String getOrderTopicConf() {
- return orderTopicConf;
- }
-
- public void setOrderTopicConf(String orderTopicConf) {
- this.orderTopicConf = orderTopicConf;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((brokerDatas == null) ? 0 : brokerDatas.hashCode());
- result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode());
- result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode());
- result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TopicRouteData other = (TopicRouteData) obj;
- if (brokerDatas == null) {
- if (other.brokerDatas != null)
- return false;
- } else if (!brokerDatas.equals(other.brokerDatas))
- return false;
- if (orderTopicConf == null) {
- if (other.orderTopicConf != null)
- return false;
- } else if (!orderTopicConf.equals(other.orderTopicConf))
- return false;
- if (queueDatas == null) {
- if (other.queueDatas != null)
- return false;
- } else if (!queueDatas.equals(other.queueDatas))
- return false;
- if (filterServerTable == null) {
- if (other.filterServerTable != null)
- return false;
- } else if (!filterServerTable.equals(other.filterServerTable))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas
- + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
deleted file mode 100644
index 86bdd3d..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.topic;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-
-public class OffsetMovedEvent extends RemotingSerializable {
- private String consumerGroup;
- private MessageQueue messageQueue;
- private long offsetRequest;
- private long offsetNew;
-
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
-
- public void setMessageQueue(MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
-
- public long getOffsetRequest() {
- return offsetRequest;
- }
-
-
- public void setOffsetRequest(long offsetRequest) {
- this.offsetRequest = offsetRequest;
- }
-
-
- public long getOffsetNew() {
- return offsetNew;
- }
-
-
- public void setOffsetNew(long offsetNew) {
- this.offsetNew = offsetNew;
- }
-
-
- @Override
- public String toString() {
- return "OffsetMovedEvent [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
- + ", offsetRequest=" + offsetRequest + ", offsetNew=" + offsetNew + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java b/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
deleted file mode 100644
index 8fc4e76..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.queue;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Comparator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * thread safe
- *
- * @author lansheng.zj
- */
-public class ConcurrentTreeMap<K, V> {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ReentrantLock lock;
- private TreeMap<K, V> tree;
- private RoundQueue<K> roundQueue;
-
-
- public ConcurrentTreeMap(int capacity, Comparator<? super K> comparator) {
- tree = new TreeMap<K, V>(comparator);
- roundQueue = new RoundQueue<K>(capacity);
- lock = new ReentrantLock(true);
- }
-
-
- public Map.Entry<K, V> pollFirstEntry() {
- lock.lock();
- try {
- return tree.pollFirstEntry();
- } finally {
- lock.unlock();
- }
- }
-
-
- public V putIfAbsentAndRetExsit(K key, V value) {
- lock.lock();
- try {
- if (roundQueue.put(key)) {
- V exsit = tree.get(key);
- if (null == exsit) {
- tree.put(key, value);
- exsit = value;
- }
- log.warn("putIfAbsentAndRetExsit success. {}", key);
- return exsit;
- }
-
- else {
- V exsit = tree.get(key);
- return exsit;
- }
- } finally {
- lock.unlock();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java b/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
deleted file mode 100644
index a3783ba..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.queue;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-
-/**
- * not thread safe
- *
- * @author lansheng.zj
- */
-public class RoundQueue<E> {
-
- private Queue<E> queue;
- private int capacity;
-
-
- public RoundQueue(int capacity) {
- this.capacity = capacity;
- queue = new LinkedList<E>();
- }
-
-
- public boolean put(E e) {
- boolean ok = false;
- if (!queue.contains(e)) {
- if (queue.size() >= capacity) {
- queue.poll();
- }
- queue.add(e);
- ok = true;
- }
-
- return ok;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java b/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
deleted file mode 100644
index aa0bc54..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.running;
-
-public enum RunningStats {
- commitLogMaxOffset,
- commitLogMinOffset,
- commitLogDiskRatio,
- consumeQueueDiskRatio,
- scheduleMessageOffset,
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
deleted file mode 100644
index 89eefa5..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.stats;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import org.slf4j.Logger;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-public class MomentStatsItem {
-
- private final AtomicLong value = new AtomicLong(0);
-
- private final String statsName;
- private final String statsKey;
- private final ScheduledExecutorService scheduledExecutorService;
- private final Logger log;
-
-
- public MomentStatsItem(String statsName, String statsKey,
- ScheduledExecutorService scheduledExecutorService, Logger log) {
- this.statsName = statsName;
- this.statsKey = statsKey;
- this.scheduledExecutorService = scheduledExecutorService;
- this.log = log;
- }
-
-
- public void init() {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtMinutes();
-
- MomentStatsItem.this.value.set(0);
- } catch (Throwable e) {
- }
- }
- }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
- }
-
-
- public void printAtMinutes() {
- log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d",
- this.statsName,
- this.statsKey,
- this.value.get()));
- }
-
- public AtomicLong getValue() {
- return value;
- }
-
-
- public String getStatsKey() {
- return statsKey;
- }
-
-
- public String getStatsName() {
- return statsName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
deleted file mode 100644
index fde88cd..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.stats;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import org.slf4j.Logger;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-public class MomentStatsItemSet {
- private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable =
- new ConcurrentHashMap<String, MomentStatsItem>(128);
- private final String statsName;
- private final ScheduledExecutorService scheduledExecutorService;
- private final Logger log;
-
-
- public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
- this.statsName = statsName;
- this.scheduledExecutorService = scheduledExecutorService;
- this.log = log;
- this.init();
- }
-
- public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() {
- return statsItemTable;
- }
-
- public String getStatsName() {
- return statsName;
- }
-
- public void init() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtMinutes();
- } catch (Throwable e) {
- }
- }
- }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
- }
-
- private void printAtMinutes() {
- Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, MomentStatsItem> next = it.next();
- next.getValue().printAtMinutes();
- }
- }
-
- public void setValue(final String statsKey, final int value) {
- MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
- statsItem.getValue().set(value);
- }
-
- public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
- MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
- if (null == statsItem) {
- statsItem =
- new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
- MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem);
-
- if (null == prev) {
-
- // statsItem.init();
- }
- }
-
- return statsItem;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
deleted file mode 100644
index 1c99699..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.stats;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import org.slf4j.Logger;
-
-import java.util.LinkedList;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-public class StatsItem {
-
- private final AtomicLong value = new AtomicLong(0);
-
- private final AtomicLong times = new AtomicLong(0);
-
- private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
-
-
- private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
-
-
- private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();
-
- private final String statsName;
- private final String statsKey;
- private final ScheduledExecutorService scheduledExecutorService;
- private final Logger log;
-
-
- public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService,
- Logger log) {
- this.statsName = statsName;
- this.statsKey = statsKey;
- this.scheduledExecutorService = scheduledExecutorService;
- this.log = log;
- }
-
- public StatsSnapshot getStatsDataInMinute() {
- return computeStatsData(this.csListMinute);
- }
-
- private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
- StatsSnapshot statsSnapshot = new StatsSnapshot();
- synchronized (csList) {
- double tps = 0;
- double avgpt = 0;
- long sum = 0;
- if (!csList.isEmpty()) {
- CallSnapshot first = csList.getFirst();
- CallSnapshot last = csList.getLast();
- sum = last.getValue() - first.getValue();
- tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
-
- long timesDiff = last.getTimes() - first.getTimes();
- if (timesDiff > 0) {
- avgpt = (sum * 1.0d) / timesDiff;
- }
- }
-
- statsSnapshot.setSum(sum);
- statsSnapshot.setTps(tps);
- statsSnapshot.setAvgpt(avgpt);
- }
-
- return statsSnapshot;
- }
-
- public StatsSnapshot getStatsDataInHour() {
- return computeStatsData(this.csListHour);
- }
-
- public StatsSnapshot getStatsDataInDay() {
- return computeStatsData(this.csListDay);
- }
-
- public void init() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- samplingInSeconds();
- } catch (Throwable e) {
- }
- }
- }, 0, 10, TimeUnit.SECONDS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- samplingInMinutes();
- } catch (Throwable e) {
- }
- }
- }, 0, 10, TimeUnit.MINUTES);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- samplingInHour();
- } catch (Throwable e) {
- }
- }
- }, 0, 1, TimeUnit.HOURS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtMinutes();
- } catch (Throwable ignored) {
- }
- }
- }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtHour();
- } catch (Throwable ignored) {
- }
- }
- }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtDay();
- } catch (Throwable ignored) {
- }
- }
- }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
- }
-
- public void samplingInSeconds() {
- synchronized (this.csListMinute) {
- this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
- .get()));
- if (this.csListMinute.size() > 7) {
- this.csListMinute.removeFirst();
- }
- }
- }
-
- public void samplingInMinutes() {
- synchronized (this.csListHour) {
- this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
- .get()));
- if (this.csListHour.size() > 7) {
- this.csListHour.removeFirst();
- }
- }
- }
-
- public void samplingInHour() {
- synchronized (this.csListDay) {
- this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
- .get()));
- if (this.csListDay.size() > 25) {
- this.csListDay.removeFirst();
- }
- }
- }
-
- public void printAtMinutes() {
- StatsSnapshot ss = computeStatsData(this.csListMinute);
- log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
- this.statsName,
- this.statsKey,
- ss.getSum(),
- ss.getTps(),
- ss.getAvgpt()));
- }
-
- public void printAtHour() {
- StatsSnapshot ss = computeStatsData(this.csListHour);
- log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
- this.statsName,
- this.statsKey,
- ss.getSum(),
- ss.getTps(),
- ss.getAvgpt()));
- }
-
- public void printAtDay() {
- StatsSnapshot ss = computeStatsData(this.csListDay);
- log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
- this.statsName,
- this.statsKey,
- ss.getSum(),
- ss.getTps(),
- ss.getAvgpt()));
- }
-
- public AtomicLong getValue() {
- return value;
- }
-
-
- public String getStatsKey() {
- return statsKey;
- }
-
-
- public String getStatsName() {
- return statsName;
- }
-
-
- public AtomicLong getTimes() {
- return times;
- }
-}
-
-
-class CallSnapshot {
- private final long timestamp;
- private final long times;
-
- private final long value;
-
-
- public CallSnapshot(long timestamp, long times, long value) {
- super();
- this.timestamp = timestamp;
- this.times = times;
- this.value = value;
- }
-
-
- public long getTimestamp() {
- return timestamp;
- }
-
-
- public long getTimes() {
- return times;
- }
-
-
- public long getValue() {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
deleted file mode 100644
index 8a2b2a1..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.stats;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import org.slf4j.Logger;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-public class StatsItemSet {
- private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable =
- new ConcurrentHashMap<String, StatsItem>(128);
-
- private final String statsName;
- private final ScheduledExecutorService scheduledExecutorService;
- private final Logger log;
-
-
- public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
- this.statsName = statsName;
- this.scheduledExecutorService = scheduledExecutorService;
- this.log = log;
- this.init();
- }
-
- public void init() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- samplingInSeconds();
- } catch (Throwable e) {
- }
- }
- }, 0, 10, TimeUnit.SECONDS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- samplingInMinutes();
- } catch (Throwable e) {
- }
- }
- }, 0, 10, TimeUnit.MINUTES);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- samplingInHour();
- } catch (Throwable e) {
- }
- }
- }, 0, 1, TimeUnit.HOURS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtMinutes();
- } catch (Throwable e) {
- }
- }
- }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtHour();
- } catch (Throwable e) {
- }
- }
- }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- printAtDay();
- } catch (Throwable e) {
- }
- }
- }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
- }
-
- private void samplingInSeconds() {
- Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, StatsItem> next = it.next();
- next.getValue().samplingInSeconds();
- }
- }
-
- private void samplingInMinutes() {
- Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, StatsItem> next = it.next();
- next.getValue().samplingInMinutes();
- }
- }
-
- private void samplingInHour() {
- Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, StatsItem> next = it.next();
- next.getValue().samplingInHour();
- }
- }
-
- private void printAtMinutes() {
- Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, StatsItem> next = it.next();
- next.getValue().printAtMinutes();
- }
- }
-
- private void printAtHour() {
- Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, StatsItem> next = it.next();
- next.getValue().printAtHour();
- }
- }
-
- private void printAtDay() {
- Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, StatsItem> next = it.next();
- next.getValue().printAtDay();
- }
- }
-
- public void addValue(final String statsKey, final int incValue, final int incTimes) {
- StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
- statsItem.getValue().addAndGet(incValue);
- statsItem.getTimes().addAndGet(incTimes);
- }
-
- public StatsItem getAndCreateStatsItem(final String statsKey) {
- StatsItem statsItem = this.statsItemTable.get(statsKey);
- if (null == statsItem) {
- statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
- StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
-
- if (null == prev) {
-
- // statsItem.init();
- }
- }
-
- return statsItem;
- }
-
- public StatsSnapshot getStatsDataInMinute(final String statsKey) {
- StatsItem statsItem = this.statsItemTable.get(statsKey);
- if (null != statsItem) {
- return statsItem.getStatsDataInMinute();
- }
- return new StatsSnapshot();
- }
-
- public StatsSnapshot getStatsDataInHour(final String statsKey) {
- StatsItem statsItem = this.statsItemTable.get(statsKey);
- if (null != statsItem) {
- return statsItem.getStatsDataInHour();
- }
- return new StatsSnapshot();
- }
-
- public StatsSnapshot getStatsDataInDay(final String statsKey) {
- StatsItem statsItem = this.statsItemTable.get(statsKey);
- if (null != statsItem) {
- return statsItem.getStatsDataInDay();
- }
- return new StatsSnapshot();
- }
-
- public StatsItem getStatsItem(final String statsKey) {
- return this.statsItemTable.get(statsKey);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
deleted file mode 100644
index 4092a2b..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.stats;
-
-public class StatsSnapshot {
- private long sum;
- private double tps;
- private double avgpt;
-
-
- public long getSum() {
- return sum;
- }
-
-
- public void setSum(long sum) {
- this.sum = sum;
- }
-
-
- public double getTps() {
- return tps;
- }
-
-
- public void setTps(double tps) {
- this.tps = tps;
- }
-
-
- public double getAvgpt() {
- return avgpt;
- }
-
-
- public void setAvgpt(double avgpt) {
- this.avgpt = avgpt;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java b/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
deleted file mode 100644
index cf8baf2..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.subscription;
-
-import com.alibaba.rocketmq.common.MixAll;
-
-
-/**
- * @author shijia.wxr
- */
-public class SubscriptionGroupConfig {
-
- private String groupName;
-
- private boolean consumeEnable = true;
- private boolean consumeFromMinEnable = true;
-
- private boolean consumeBroadcastEnable = true;
-
- private int retryQueueNums = 1;
-
- private int retryMaxTimes = 16;
-
- private long brokerId = MixAll.MASTER_ID;
-
- private long whichBrokerWhenConsumeSlowly = 1;
-
- private boolean notifyConsumerIdsChangedEnable = true;
-
-
- public String getGroupName() {
- return groupName;
- }
-
-
- public void setGroupName(String groupName) {
- this.groupName = groupName;
- }
-
-
- public boolean isConsumeEnable() {
- return consumeEnable;
- }
-
-
- public void setConsumeEnable(boolean consumeEnable) {
- this.consumeEnable = consumeEnable;
- }
-
-
- public boolean isConsumeFromMinEnable() {
- return consumeFromMinEnable;
- }
-
-
- public void setConsumeFromMinEnable(boolean consumeFromMinEnable) {
- this.consumeFromMinEnable = consumeFromMinEnable;
- }
-
-
- public boolean isConsumeBroadcastEnable() {
- return consumeBroadcastEnable;
- }
-
-
- public void setConsumeBroadcastEnable(boolean consumeBroadcastEnable) {
- this.consumeBroadcastEnable = consumeBroadcastEnable;
- }
-
-
- public int getRetryQueueNums() {
- return retryQueueNums;
- }
-
-
- public void setRetryQueueNums(int retryQueueNums) {
- this.retryQueueNums = retryQueueNums;
- }
-
-
- public int getRetryMaxTimes() {
- return retryMaxTimes;
- }
-
-
- public void setRetryMaxTimes(int retryMaxTimes) {
- this.retryMaxTimes = retryMaxTimes;
- }
-
-
- public long getBrokerId() {
- return brokerId;
- }
-
-
- public void setBrokerId(long brokerId) {
- this.brokerId = brokerId;
- }
-
-
- public long getWhichBrokerWhenConsumeSlowly() {
- return whichBrokerWhenConsumeSlowly;
- }
-
-
- public void setWhichBrokerWhenConsumeSlowly(long whichBrokerWhenConsumeSlowly) {
- this.whichBrokerWhenConsumeSlowly = whichBrokerWhenConsumeSlowly;
- }
-
- public boolean isNotifyConsumerIdsChangedEnable() {
- return notifyConsumerIdsChangedEnable;
- }
-
- public void setNotifyConsumerIdsChangedEnable(final boolean notifyConsumerIdsChangedEnable) {
- this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (brokerId ^ (brokerId >>> 32));
- result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
- result = prime * result + (consumeEnable ? 1231 : 1237);
- result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
- result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
- result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
- result = prime * result + retryMaxTimes;
- result = prime * result + retryQueueNums;
- result =
- prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32));
- return result;
- }
-
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj;
- if (brokerId != other.brokerId)
- return false;
- if (consumeBroadcastEnable != other.consumeBroadcastEnable)
- return false;
- if (consumeEnable != other.consumeEnable)
- return false;
- if (consumeFromMinEnable != other.consumeFromMinEnable)
- return false;
- if (groupName == null) {
- if (other.groupName != null)
- return false;
- } else if (!groupName.equals(other.groupName))
- return false;
- if (retryMaxTimes != other.retryMaxTimes)
- return false;
- if (retryQueueNums != other.retryQueueNums)
- return false;
- if (whichBrokerWhenConsumeSlowly != other.whichBrokerWhenConsumeSlowly)
- return false;
- if (notifyConsumerIdsChangedEnable != other.notifyConsumerIdsChangedEnable)
- return false;
- return true;
- }
-
-
- @Override
- public String toString() {
- return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
- + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
- + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
- + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
- + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
- + notifyConsumerIdsChangedEnable + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
deleted file mode 100644
index 2f9d057..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.sysflag;
-
-/**
- * @author shijia.wxr
- */
-public class MessageSysFlag {
- public final static int COMPRESSED_FLAG = 0x1;
- public final static int MULTI_TAGS_FLAG = 0x1 << 1;
- public final static int TRANSACTION_NOT_TYPE = 0;
- public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
- public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
- public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
-
-
- public static int getTransactionValue(final int flag) {
- return flag & TRANSACTION_ROLLBACK_TYPE;
- }
-
-
- public static int resetTransactionValue(final int flag, final int type) {
- return (flag & (~TRANSACTION_ROLLBACK_TYPE)) | type;
- }
-
-
- public static int clearCompressedFlag(final int flag) {
- return flag & (~COMPRESSED_FLAG);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
deleted file mode 100644
index d0f7287..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.sysflag;
-
-/**
- * @author shijia.wxr
- */
-public class PullSysFlag {
- private final static int FLAG_COMMIT_OFFSET = 0x1 << 0;
- private final static int FLAG_SUSPEND = 0x1 << 1;
- private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
- private final static int FLAG_CLASS_FILTER = 0x1 << 3;
-
-
- public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
- final boolean subscription, final boolean classFilter) {
- int flag = 0;
-
- if (commitOffset) {
- flag |= FLAG_COMMIT_OFFSET;
- }
-
- if (suspend) {
- flag |= FLAG_SUSPEND;
- }
-
- if (subscription) {
- flag |= FLAG_SUBSCRIPTION;
- }
-
- if (classFilter) {
- flag |= FLAG_CLASS_FILTER;
- }
-
- return flag;
- }
-
-
- public static int clearCommitOffsetFlag(final int sysFlag) {
- return sysFlag & (~FLAG_COMMIT_OFFSET);
- }
-
-
- public static boolean hasCommitOffsetFlag(final int sysFlag) {
- return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
- }
-
-
- public static boolean hasSuspendFlag(final int sysFlag) {
- return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
- }
-
-
- public static boolean hasSubscriptionFlag(final int sysFlag) {
- return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
- }
-
-
- public static boolean hasClassFilterFlag(final int sysFlag) {
- return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
deleted file mode 100644
index 65e3115..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.sysflag;
-
-/**
- * @author manhong.yqd
- */
-public class SubscriptionSysFlag {
-
- private final static int FLAG_UNIT = 0x1 << 0;
-
-
- public static int buildSysFlag(final boolean unit) {
- int sysFlag = 0;
-
- if (unit) {
- sysFlag |= FLAG_UNIT;
- }
-
- return sysFlag;
- }
-
-
- public static int setUnitFlag(final int sysFlag) {
- return sysFlag | FLAG_UNIT;
- }
-
-
- public static int clearUnitFlag(final int sysFlag) {
- return sysFlag & (~FLAG_UNIT);
- }
-
-
- public static boolean hasUnitFlag(final int sysFlag) {
- return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
- }
-
-
- public static void main(String[] args) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
deleted file mode 100644
index 90d48f4..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.sysflag;
-
-/**
-
- *
- * @author manhong.yqd
- *
- */
-public class TopicSysFlag {
-
- private final static int FLAG_UNIT = 0x1 << 0;
-
- private final static int FLAG_UNIT_SUB = 0x1 << 1;
-
-
- public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) {
- int sysFlag = 0;
-
- if (unit) {
- sysFlag |= FLAG_UNIT;
- }
-
- if (hasUnitSub) {
- sysFlag |= FLAG_UNIT_SUB;
- }
-
- return sysFlag;
- }
-
-
- public static int setUnitFlag(final int sysFlag) {
- return sysFlag | FLAG_UNIT;
- }
-
-
- public static int clearUnitFlag(final int sysFlag) {
- return sysFlag & (~FLAG_UNIT);
- }
-
-
- public static boolean hasUnitFlag(final int sysFlag) {
- return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
- }
-
-
- public static int setUnitSubFlag(final int sysFlag) {
- return sysFlag | FLAG_UNIT_SUB;
- }
-
-
- public static int clearUnitSubFlag(final int sysFlag) {
- return sysFlag & (~FLAG_UNIT_SUB);
- }
-
-
- public static boolean hasUnitSubFlag(final int sysFlag) {
- return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB;
- }
-
-
- public static void main(String[] args) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java b/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
deleted file mode 100644
index 444928f..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.utils;
-
-import io.netty.channel.Channel;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-public class ChannelUtil {
- public static String getRemoteIp(Channel channel) {
- InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress();
- if (inetSocketAddress == null) {
- return "";
- }
- final InetAddress inetAddr = inetSocketAddress.getAddress();
- return inetAddr != null ? inetAddr.getHostAddress() : inetSocketAddress.getHostName();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java b/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
deleted file mode 100755
index dadac46..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.utils;
-
-import com.alibaba.rocketmq.common.MQVersion;
-import com.alibaba.rocketmq.common.MixAll;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.util.Iterator;
-import java.util.List;
-
-
-public class HttpTinyClient {
-
- static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
- String encoding, long readTimeoutMs) throws IOException {
- String encodedContent = encodingParams(paramValues, encoding);
- url += (null == encodedContent) ? "" : ("?" + encodedContent);
-
- HttpURLConnection conn = null;
- try {
- conn = (HttpURLConnection) new URL(url).openConnection();
- conn.setRequestMethod("GET");
- conn.setConnectTimeout((int) readTimeoutMs);
- conn.setReadTimeout((int) readTimeoutMs);
- setHeaders(conn, headers, encoding);
-
- conn.connect();
- int respCode = conn.getResponseCode();
- String resp = null;
-
- if (HttpURLConnection.HTTP_OK == respCode) {
- resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
- } else {
- resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
- }
- return new HttpResult(respCode, resp);
- } finally {
- if (conn != null) {
- conn.disconnect();
- }
- }
- }
-
- static private String encodingParams(List<String> paramValues, String encoding)
- throws UnsupportedEncodingException {
- StringBuilder sb = new StringBuilder();
- if (null == paramValues) {
- return null;
- }
-
- for (Iterator<String> iter = paramValues.iterator(); iter.hasNext(); ) {
- sb.append(iter.next()).append("=");
- sb.append(URLEncoder.encode(iter.next(), encoding));
- if (iter.hasNext()) {
- sb.append("&");
- }
- }
- return sb.toString();
- }
-
- static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
- if (null != headers) {
- for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) {
- conn.addRequestProperty(iter.next(), iter.next());
- }
- }
- conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
- conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
-
-
- String ts = String.valueOf(System.currentTimeMillis());
- conn.addRequestProperty("Metaq-Client-RequestTS", ts);
- }
-
- /**
-
- *
- * @param url
- * @param headers
-
- * @param paramValues
-
- * @param encoding
-
- * @param readTimeoutMs
-
- *
- * @return the http response of given http post request
- *
- * @throws java.io.IOException
- */
- static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
- String encoding, long readTimeoutMs) throws IOException {
- String encodedContent = encodingParams(paramValues, encoding);
-
- HttpURLConnection conn = null;
- try {
- conn = (HttpURLConnection) new URL(url).openConnection();
- conn.setRequestMethod("POST");
- conn.setConnectTimeout(3000);
- conn.setReadTimeout((int) readTimeoutMs);
- conn.setDoOutput(true);
- conn.setDoInput(true);
- setHeaders(conn, headers, encoding);
-
- conn.getOutputStream().write(encodedContent.getBytes(MixAll.DEFAULT_CHARSET));
-
- int respCode = conn.getResponseCode();
- String resp = null;
-
- if (HttpURLConnection.HTTP_OK == respCode) {
- resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
- } else {
- resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
- }
- return new HttpResult(respCode, resp);
- } finally {
- if (null != conn) {
- conn.disconnect();
- }
- }
- }
-
- static public class HttpResult {
- final public int code;
- final public String content;
-
-
- public HttpResult(int code, String content) {
- this.code = code;
- this.content = content;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
deleted file mode 100644
index ced2fae..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.utils;
-
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-import java.io.*;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author manhong.yqd
- */
-public class IOTinyUtils {
-
- static public String toString(InputStream input, String encoding) throws IOException {
- return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader(
- input, encoding));
- }
-
-
- static public String toString(Reader reader) throws IOException {
- CharArrayWriter sw = new CharArrayWriter();
- copy(reader, sw);
- return sw.toString();
- }
-
-
- static public long copy(Reader input, Writer output) throws IOException {
- char[] buffer = new char[1 << 12];
- long count = 0;
- for (int n = 0; (n = input.read(buffer)) >= 0; ) {
- output.write(buffer, 0, n);
- count += n;
- }
- return count;
- }
-
-
- /**
-
- */
- static public List<String> readLines(Reader input) throws IOException {
- BufferedReader reader = toBufferedReader(input);
- List<String> list = new ArrayList<String>();
- String line = null;
- for (;;) {
- line = reader.readLine();
- if (null != line) {
- list.add(line);
- } else {
- break;
- }
- }
- return list;
- }
-
-
- static private BufferedReader toBufferedReader(Reader reader) {
- return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader);
- }
-
-
- static public void copyFile(String source, String target) throws IOException {
- File sf = new File(source);
- if (!sf.exists()) {
- throw new IllegalArgumentException("source file does not exist.");
- }
- File tf = new File(target);
- tf.getParentFile().mkdirs();
- if (!tf.exists() && !tf.createNewFile()) {
- throw new RuntimeException("failed to create target file.");
- }
-
- FileChannel sc = null;
- FileChannel tc = null;
- try {
- tc = new FileOutputStream(tf).getChannel();
- sc = new FileInputStream(sf).getChannel();
- sc.transferTo(0, sc.size(), tc);
- } finally {
- if (null != sc) {
- sc.close();
- }
- if (null != tc) {
- tc.close();
- }
- }
- }
-
-
- public static void delete(File fileOrDir) throws IOException {
- if (fileOrDir == null) {
- return;
- }
-
- if (fileOrDir.isDirectory()) {
- cleanDirectory(fileOrDir);
- }
-
- fileOrDir.delete();
- }
-
-
- /**
-
- */
- public static void cleanDirectory(File directory) throws IOException {
- if (!directory.exists()) {
- String message = directory + " does not exist";
- throw new IllegalArgumentException(message);
- }
-
- if (!directory.isDirectory()) {
- String message = directory + " is not a directory";
- throw new IllegalArgumentException(message);
- }
-
- File[] files = directory.listFiles();
- if (files == null) { // null if security restricted
- throw new IOException("Failed to list contents of " + directory);
- }
-
- IOException exception = null;
- for (File file : files) {
- try {
- delete(file);
- } catch (IOException ioe) {
- exception = ioe;
- }
- }
-
- if (null != exception) {
- throw exception;
- }
- }
-
-
- public static void writeStringToFile(File file, String data, String encoding) throws IOException {
- OutputStream os = null;
- try {
- os = new FileOutputStream(file);
- os.write(data.getBytes(encoding));
- } finally {
- if (null != os) {
- os.close();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
new file mode 100644
index 0000000..f035ed6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerConfig {
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ @ImportantField
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ @ImportantField
+ private String brokerIP1 = RemotingUtil.getLocalAddress();
+ private String brokerIP2 = RemotingUtil.getLocalAddress();
+ @ImportantField
+ private String brokerName = localHostName();
+ @ImportantField
+ private String brokerClusterName = "DefaultCluster";
+ @ImportantField
+ private long brokerId = MixAll.MASTER_ID;
+ private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
+ private int defaultTopicQueueNums = 8;
+ @ImportantField
+ private boolean autoCreateTopicEnable = true;
+
+ private boolean clusterTopicEnable = true;
+
+ private boolean brokerTopicEnable = true;
+ @ImportantField
+ private boolean autoCreateSubscriptionGroup = true;
+ private String messageStorePlugIn = "";
+
+ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
+ private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+ private int adminBrokerThreadPoolNums = 16;
+ private int clientManageThreadPoolNums = 32;
+ private int consumerManageThreadPoolNums = 32;
+
+ private int flushConsumerOffsetInterval = 1000 * 5;
+
+ private int flushConsumerOffsetHistoryInterval = 1000 * 60;
+
+ @ImportantField
+ private boolean rejectTransactionMessage = false;
+ @ImportantField
+ private boolean fetchNamesrvAddrByAddressServer = false;
+ private int sendThreadPoolQueueCapacity = 10000;
+ private int pullThreadPoolQueueCapacity = 100000;
+ private int clientManagerThreadPoolQueueCapacity = 1000000;
+ private int consumerManagerThreadPoolQueueCapacity = 1000000;
+
+ private int filterServerNums = 0;
+
+ private boolean longPollingEnable = true;
+
+ private long shortPollingTimeMills = 1000;
+
+ private boolean notifyConsumerIdsChangedEnable = true;
+
+ private boolean highSpeedMode = false;
+
+ private boolean commercialEnable = true;
+ private int commercialTimerCount = 1;
+ private int commercialTransCount = 1;
+ private int commercialBigCount = 1;
+ private int commercialBaseCount = 1;
+
+ private boolean transferMsgByHeap = true;
+ private int maxDelayTime = 40;
+
+
+ private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
+ private int registerBrokerTimeoutMills = 6000;
+
+ private boolean slaveReadEnable = false;
+
+ private boolean disableConsumeIfConsumerReadSlowly = false;
+ private long consumerFallbehindThreshold = 1024 * 1024 * 1024 * 16;
+
+ private long waitTimeMillsInSendQueue = 200;
+
+ private long startAcceptSendRequestTimeStamp = 0L;
+
+ private boolean traceOn = true;
+
+ public boolean isTraceOn() {
+ return traceOn;
+ }
+
+ public void setTraceOn(final boolean traceOn) {
+ this.traceOn = traceOn;
+ }
+
+ public long getStartAcceptSendRequestTimeStamp() {
+ return startAcceptSendRequestTimeStamp;
+ }
+
+ public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) {
+ this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
+ }
+
+ public long getWaitTimeMillsInSendQueue() {
+ return waitTimeMillsInSendQueue;
+ }
+
+ public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) {
+ this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue;
+ }
+
+ public long getConsumerFallbehindThreshold() {
+ return consumerFallbehindThreshold;
+ }
+
+ public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) {
+ this.consumerFallbehindThreshold = consumerFallbehindThreshold;
+ }
+
+ public boolean isDisableConsumeIfConsumerReadSlowly() {
+ return disableConsumeIfConsumerReadSlowly;
+ }
+
+ public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) {
+ this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly;
+ }
+
+ public boolean isSlaveReadEnable() {
+ return slaveReadEnable;
+ }
+
+ public void setSlaveReadEnable(final boolean slaveReadEnable) {
+ this.slaveReadEnable = slaveReadEnable;
+ }
+
+ public static String localHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ return "DEFAULT_BROKER";
+ }
+
+ public int getRegisterBrokerTimeoutMills() {
+ return registerBrokerTimeoutMills;
+ }
+
+ public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) {
+ this.registerBrokerTimeoutMills = registerBrokerTimeoutMills;
+ }
+
+ public String getRegionId() {
+ return regionId;
+ }
+
+ public void setRegionId(final String regionId) {
+ this.regionId = regionId;
+ }
+
+ public boolean isTransferMsgByHeap() {
+ return transferMsgByHeap;
+ }
+
+ public void setTransferMsgByHeap(final boolean transferMsgByHeap) {
+ this.transferMsgByHeap = transferMsgByHeap;
+ }
+
+ public String getMessageStorePlugIn() {
+ return messageStorePlugIn;
+ }
+
+ public void setMessageStorePlugIn(String messageStorePlugIn) {
+ this.messageStorePlugIn = messageStorePlugIn;
+ }
+
+ public boolean isHighSpeedMode() {
+ return highSpeedMode;
+ }
+
+
+ public void setHighSpeedMode(final boolean highSpeedMode) {
+ this.highSpeedMode = highSpeedMode;
+ }
+
+
+ public String getRocketmqHome() {
+ return rocketmqHome;
+ }
+
+
+ public void setRocketmqHome(String rocketmqHome) {
+ this.rocketmqHome = rocketmqHome;
+ }
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public int getBrokerPermission() {
+ return brokerPermission;
+ }
+
+
+ public void setBrokerPermission(int brokerPermission) {
+ this.brokerPermission = brokerPermission;
+ }
+
+
+ public int getDefaultTopicQueueNums() {
+ return defaultTopicQueueNums;
+ }
+
+
+ public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
+ this.defaultTopicQueueNums = defaultTopicQueueNums;
+ }
+
+
+ public boolean isAutoCreateTopicEnable() {
+ return autoCreateTopicEnable;
+ }
+
+
+ public void setAutoCreateTopicEnable(boolean autoCreateTopic) {
+ this.autoCreateTopicEnable = autoCreateTopic;
+ }
+
+
+ public String getBrokerClusterName() {
+ return brokerClusterName;
+ }
+
+
+ public void setBrokerClusterName(String brokerClusterName) {
+ this.brokerClusterName = brokerClusterName;
+ }
+
+
+ public String getBrokerIP1() {
+ return brokerIP1;
+ }
+
+
+ public void setBrokerIP1(String brokerIP1) {
+ this.brokerIP1 = brokerIP1;
+ }
+
+
+ public String getBrokerIP2() {
+ return brokerIP2;
+ }
+
+
+ public void setBrokerIP2(String brokerIP2) {
+ this.brokerIP2 = brokerIP2;
+ }
+
+ public int getSendMessageThreadPoolNums() {
+ return sendMessageThreadPoolNums;
+ }
+
+ public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) {
+ this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;
+ }
+
+
+ public int getPullMessageThreadPoolNums() {
+ return pullMessageThreadPoolNums;
+ }
+
+
+ public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {
+ this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
+ }
+
+
+ public int getAdminBrokerThreadPoolNums() {
+ return adminBrokerThreadPoolNums;
+ }
+
+
+ public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) {
+ this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums;
+ }
+
+
+ public int getFlushConsumerOffsetInterval() {
+ return flushConsumerOffsetInterval;
+ }
+
+
+ public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) {
+ this.flushConsumerOffsetInterval = flushConsumerOffsetInterval;
+ }
+
+
+ public int getFlushConsumerOffsetHistoryInterval() {
+ return flushConsumerOffsetHistoryInterval;
+ }
+
+
+ public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) {
+ this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval;
+ }
+
+
+ public boolean isClusterTopicEnable() {
+ return clusterTopicEnable;
+ }
+
+
+ public void setClusterTopicEnable(boolean clusterTopicEnable) {
+ this.clusterTopicEnable = clusterTopicEnable;
+ }
+
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+
+ public boolean isAutoCreateSubscriptionGroup() {
+ return autoCreateSubscriptionGroup;
+ }
+
+
+ public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
+ this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
+ }
+
+
+ public boolean isRejectTransactionMessage() {
+ return rejectTransactionMessage;
+ }
+
+
+ public void setRejectTransactionMessage(boolean rejectTransactionMessage) {
+ this.rejectTransactionMessage = rejectTransactionMessage;
+ }
+
+
+ public boolean isFetchNamesrvAddrByAddressServer() {
+ return fetchNamesrvAddrByAddressServer;
+ }
+
+
+ public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
+ this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
+ }
+
+
+ public int getSendThreadPoolQueueCapacity() {
+ return sendThreadPoolQueueCapacity;
+ }
+
+
+ public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) {
+ this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
+ }
+
+
+ public int getPullThreadPoolQueueCapacity() {
+ return pullThreadPoolQueueCapacity;
+ }
+
+
+ public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {
+ this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
+ }
+
+
+ public boolean isBrokerTopicEnable() {
+ return brokerTopicEnable;
+ }
+
+
+ public void setBrokerTopicEnable(boolean brokerTopicEnable) {
+ this.brokerTopicEnable = brokerTopicEnable;
+ }
+
+
+ public int getFilterServerNums() {
+ return filterServerNums;
+ }
+
+
+ public void setFilterServerNums(int filterServerNums) {
+ this.filterServerNums = filterServerNums;
+ }
+
+
+ public boolean isLongPollingEnable() {
+ return longPollingEnable;
+ }
+
+
+ public void setLongPollingEnable(boolean longPollingEnable) {
+ this.longPollingEnable = longPollingEnable;
+ }
+
+
+ public boolean isNotifyConsumerIdsChangedEnable() {
+ return notifyConsumerIdsChangedEnable;
+ }
+
+
+ public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
+ this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+ }
+
+
+ public long getShortPollingTimeMills() {
+ return shortPollingTimeMills;
+ }
+
+
+ public void setShortPollingTimeMills(long shortPollingTimeMills) {
+ this.shortPollingTimeMills = shortPollingTimeMills;
+ }
+
+
+ public int getClientManageThreadPoolNums() {
+ return clientManageThreadPoolNums;
+ }
+
+
+ public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) {
+ this.clientManageThreadPoolNums = clientManageThreadPoolNums;
+ }
+
+
+ public boolean isCommercialEnable() {
+ return commercialEnable;
+ }
+
+
+ public void setCommercialEnable(final boolean commercialEnable) {
+ this.commercialEnable = commercialEnable;
+ }
+
+ public int getCommercialTimerCount() {
+ return commercialTimerCount;
+ }
+
+ public void setCommercialTimerCount(final int commercialTimerCount) {
+ this.commercialTimerCount = commercialTimerCount;
+ }
+
+ public int getCommercialTransCount() {
+ return commercialTransCount;
+ }
+
+ public void setCommercialTransCount(final int commercialTransCount) {
+ this.commercialTransCount = commercialTransCount;
+ }
+
+ public int getCommercialBigCount() {
+ return commercialBigCount;
+ }
+
+ public void setCommercialBigCount(final int commercialBigCount) {
+ this.commercialBigCount = commercialBigCount;
+ }
+
+ public int getMaxDelayTime() {
+ return maxDelayTime;
+ }
+
+
+ public void setMaxDelayTime(final int maxDelayTime) {
+ this.maxDelayTime = maxDelayTime;
+ }
+
+ public int getClientManagerThreadPoolQueueCapacity() {
+ return clientManagerThreadPoolQueueCapacity;
+ }
+
+ public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) {
+ this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity;
+ }
+
+ public int getConsumerManagerThreadPoolQueueCapacity() {
+ return consumerManagerThreadPoolQueueCapacity;
+ }
+
+ public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) {
+ this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity;
+ }
+
+ public int getConsumerManageThreadPoolNums() {
+ return consumerManageThreadPoolNums;
+ }
+
+ public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
+ this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
+ }
+
+ public int getCommercialBaseCount() {
+ return commercialBaseCount;
+ }
+
+ public void setCommercialBaseCount(int commercialBaseCount) {
+ this.commercialBaseCount = commercialBaseCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java
new file mode 100644
index 0000000..fc73b71
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BrokerConfigSingleton {
+ private static AtomicBoolean isInit = new AtomicBoolean();
+ private static BrokerConfig brokerConfig;
+
+ public static BrokerConfig getBrokerConfig() {
+ if (brokerConfig == null) {
+ throw new IllegalArgumentException("brokerConfig Cannot be null !");
+ }
+ return brokerConfig;
+ }
+
+ public static void setBrokerConfig(BrokerConfig brokerConfig) {
+ if (!isInit.compareAndSet(false, true)) {
+ throw new IllegalArgumentException("broker config have inited !");
+ }
+ BrokerConfigSingleton.brokerConfig = brokerConfig;
+ }
+}