You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:02 UTC
[06/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
new file mode 100644
index 0000000..42bb561
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class StatsItem {
+
+ private final AtomicLong value = new AtomicLong(0);
+
+ private final AtomicLong times = new AtomicLong(0);
+
+ private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
+
+
+ private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
+
+
+ private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();
+
+ private final String statsName;
+ private final String statsKey;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Logger log;
+
+
+ public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService,
+ Logger log) {
+ this.statsName = statsName;
+ this.statsKey = statsKey;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+ }
+
+ public StatsSnapshot getStatsDataInMinute() {
+ return computeStatsData(this.csListMinute);
+ }
+
+ private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
+ StatsSnapshot statsSnapshot = new StatsSnapshot();
+ synchronized (csList) {
+ double tps = 0;
+ double avgpt = 0;
+ long sum = 0;
+ if (!csList.isEmpty()) {
+ CallSnapshot first = csList.getFirst();
+ CallSnapshot last = csList.getLast();
+ sum = last.getValue() - first.getValue();
+ tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
+
+ long timesDiff = last.getTimes() - first.getTimes();
+ if (timesDiff > 0) {
+ avgpt = (sum * 1.0d) / timesDiff;
+ }
+ }
+
+ statsSnapshot.setSum(sum);
+ statsSnapshot.setTps(tps);
+ statsSnapshot.setAvgpt(avgpt);
+ }
+
+ return statsSnapshot;
+ }
+
+ public StatsSnapshot getStatsDataInHour() {
+ return computeStatsData(this.csListHour);
+ }
+
+ public StatsSnapshot getStatsDataInDay() {
+ return computeStatsData(this.csListDay);
+ }
+
+ public void init() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInSeconds();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.SECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInMinutes();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.MINUTES);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInHour();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 1, TimeUnit.HOURS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtHour();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtDay();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+ }
+
+ public void samplingInSeconds() {
+ synchronized (this.csListMinute) {
+ this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+ .get()));
+ if (this.csListMinute.size() > 7) {
+ this.csListMinute.removeFirst();
+ }
+ }
+ }
+
+ public void samplingInMinutes() {
+ synchronized (this.csListHour) {
+ this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+ .get()));
+ if (this.csListHour.size() > 7) {
+ this.csListHour.removeFirst();
+ }
+ }
+ }
+
+ public void samplingInHour() {
+ synchronized (this.csListDay) {
+ this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+ .get()));
+ if (this.csListDay.size() > 25) {
+ this.csListDay.removeFirst();
+ }
+ }
+ }
+
+ public void printAtMinutes() {
+ StatsSnapshot ss = computeStatsData(this.csListMinute);
+ log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt()));
+ }
+
+ public void printAtHour() {
+ StatsSnapshot ss = computeStatsData(this.csListHour);
+ log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt()));
+ }
+
+ public void printAtDay() {
+ StatsSnapshot ss = computeStatsData(this.csListDay);
+ log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt()));
+ }
+
+ public AtomicLong getValue() {
+ return value;
+ }
+
+
+ public String getStatsKey() {
+ return statsKey;
+ }
+
+
+ public String getStatsName() {
+ return statsName;
+ }
+
+
+ public AtomicLong getTimes() {
+ return times;
+ }
+}
+
+
+class CallSnapshot {
+ private final long timestamp;
+ private final long times;
+
+ private final long value;
+
+
+ public CallSnapshot(long timestamp, long times, long value) {
+ super();
+ this.timestamp = timestamp;
+ this.times = times;
+ this.value = value;
+ }
+
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+
+ public long getTimes() {
+ return times;
+ }
+
+
+ public long getValue() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
new file mode 100644
index 0000000..919745d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class StatsItemSet {
+ private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable =
+ new ConcurrentHashMap<String, StatsItem>(128);
+
+ private final String statsName;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Logger log;
+
+
+ public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+ this.statsName = statsName;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+ this.init();
+ }
+
+ public void init() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInSeconds();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.SECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInMinutes();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.MINUTES);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInHour();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 1, TimeUnit.HOURS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtHour();
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtDay();
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+ }
+
+ private void samplingInSeconds() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().samplingInSeconds();
+ }
+ }
+
+ private void samplingInMinutes() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().samplingInMinutes();
+ }
+ }
+
+ private void samplingInHour() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().samplingInHour();
+ }
+ }
+
+ private void printAtMinutes() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().printAtMinutes();
+ }
+ }
+
+ private void printAtHour() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().printAtHour();
+ }
+ }
+
+ private void printAtDay() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().printAtDay();
+ }
+ }
+
+ public void addValue(final String statsKey, final int incValue, final int incTimes) {
+ StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+ statsItem.getValue().addAndGet(incValue);
+ statsItem.getTimes().addAndGet(incTimes);
+ }
+
+ public StatsItem getAndCreateStatsItem(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null == statsItem) {
+ statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+ StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+
+ if (null == prev) {
+
+ // statsItem.init();
+ }
+ }
+
+ return statsItem;
+ }
+
+ public StatsSnapshot getStatsDataInMinute(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInMinute();
+ }
+ return new StatsSnapshot();
+ }
+
+ public StatsSnapshot getStatsDataInHour(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInHour();
+ }
+ return new StatsSnapshot();
+ }
+
+ public StatsSnapshot getStatsDataInDay(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInDay();
+ }
+ return new StatsSnapshot();
+ }
+
+ public StatsItem getStatsItem(final String statsKey) {
+ return this.statsItemTable.get(statsKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
new file mode 100644
index 0000000..652d214
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+public class StatsSnapshot {
+ private long sum;
+ private double tps;
+ private double avgpt;
+
+
+ public long getSum() {
+ return sum;
+ }
+
+
+ public void setSum(long sum) {
+ this.sum = sum;
+ }
+
+
+ public double getTps() {
+ return tps;
+ }
+
+
+ public void setTps(double tps) {
+ this.tps = tps;
+ }
+
+
+ public double getAvgpt() {
+ return avgpt;
+ }
+
+
+ public void setAvgpt(double avgpt) {
+ this.avgpt = avgpt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
new file mode 100644
index 0000000..5c3a3c3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import org.apache.rocketmq.common.MixAll;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupConfig {
+
+ private String groupName;
+
+ private boolean consumeEnable = true;
+ private boolean consumeFromMinEnable = true;
+
+ private boolean consumeBroadcastEnable = true;
+
+ private int retryQueueNums = 1;
+
+ private int retryMaxTimes = 16;
+
+ private long brokerId = MixAll.MASTER_ID;
+
+ private long whichBrokerWhenConsumeSlowly = 1;
+
+ private boolean notifyConsumerIdsChangedEnable = true;
+
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+
+ public boolean isConsumeEnable() {
+ return consumeEnable;
+ }
+
+
+ public void setConsumeEnable(boolean consumeEnable) {
+ this.consumeEnable = consumeEnable;
+ }
+
+
+ public boolean isConsumeFromMinEnable() {
+ return consumeFromMinEnable;
+ }
+
+
+ public void setConsumeFromMinEnable(boolean consumeFromMinEnable) {
+ this.consumeFromMinEnable = consumeFromMinEnable;
+ }
+
+
+ public boolean isConsumeBroadcastEnable() {
+ return consumeBroadcastEnable;
+ }
+
+
+ public void setConsumeBroadcastEnable(boolean consumeBroadcastEnable) {
+ this.consumeBroadcastEnable = consumeBroadcastEnable;
+ }
+
+
+ public int getRetryQueueNums() {
+ return retryQueueNums;
+ }
+
+
+ public void setRetryQueueNums(int retryQueueNums) {
+ this.retryQueueNums = retryQueueNums;
+ }
+
+
+ public int getRetryMaxTimes() {
+ return retryMaxTimes;
+ }
+
+
+ public void setRetryMaxTimes(int retryMaxTimes) {
+ this.retryMaxTimes = retryMaxTimes;
+ }
+
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+
+ public long getWhichBrokerWhenConsumeSlowly() {
+ return whichBrokerWhenConsumeSlowly;
+ }
+
+
+ public void setWhichBrokerWhenConsumeSlowly(long whichBrokerWhenConsumeSlowly) {
+ this.whichBrokerWhenConsumeSlowly = whichBrokerWhenConsumeSlowly;
+ }
+
+ public boolean isNotifyConsumerIdsChangedEnable() {
+ return notifyConsumerIdsChangedEnable;
+ }
+
+ public void setNotifyConsumerIdsChangedEnable(final boolean notifyConsumerIdsChangedEnable) {
+ this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (brokerId ^ (brokerId >>> 32));
+ result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
+ result = prime * result + (consumeEnable ? 1231 : 1237);
+ result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
+ result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
+ result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
+ result = prime * result + retryMaxTimes;
+ result = prime * result + retryQueueNums;
+ result =
+ prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32));
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj;
+ if (brokerId != other.brokerId)
+ return false;
+ if (consumeBroadcastEnable != other.consumeBroadcastEnable)
+ return false;
+ if (consumeEnable != other.consumeEnable)
+ return false;
+ if (consumeFromMinEnable != other.consumeFromMinEnable)
+ return false;
+ if (groupName == null) {
+ if (other.groupName != null)
+ return false;
+ } else if (!groupName.equals(other.groupName))
+ return false;
+ if (retryMaxTimes != other.retryMaxTimes)
+ return false;
+ if (retryQueueNums != other.retryQueueNums)
+ return false;
+ if (whichBrokerWhenConsumeSlowly != other.whichBrokerWhenConsumeSlowly)
+ return false;
+ if (notifyConsumerIdsChangedEnable != other.notifyConsumerIdsChangedEnable)
+ return false;
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+ + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+ + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+ + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+ + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+ + notifyConsumerIdsChangedEnable + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
new file mode 100644
index 0000000..8a069e5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageSysFlag {
+ public final static int COMPRESSED_FLAG = 0x1;
+ public final static int MULTI_TAGS_FLAG = 0x1 << 1;
+ public final static int TRANSACTION_NOT_TYPE = 0;
+ public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
+ public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
+ public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
+
+
+ public static int getTransactionValue(final int flag) {
+ return flag & TRANSACTION_ROLLBACK_TYPE;
+ }
+
+
+ public static int resetTransactionValue(final int flag, final int type) {
+ return (flag & (~TRANSACTION_ROLLBACK_TYPE)) | type;
+ }
+
+
+ public static int clearCompressedFlag(final int flag) {
+ return flag & (~COMPRESSED_FLAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
new file mode 100644
index 0000000..cc2a5c8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class PullSysFlag {
+ private final static int FLAG_COMMIT_OFFSET = 0x1 << 0;
+ private final static int FLAG_SUSPEND = 0x1 << 1;
+ private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
+ private final static int FLAG_CLASS_FILTER = 0x1 << 3;
+
+
+ public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
+ final boolean subscription, final boolean classFilter) {
+ int flag = 0;
+
+ if (commitOffset) {
+ flag |= FLAG_COMMIT_OFFSET;
+ }
+
+ if (suspend) {
+ flag |= FLAG_SUSPEND;
+ }
+
+ if (subscription) {
+ flag |= FLAG_SUBSCRIPTION;
+ }
+
+ if (classFilter) {
+ flag |= FLAG_CLASS_FILTER;
+ }
+
+ return flag;
+ }
+
+
+ public static int clearCommitOffsetFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_COMMIT_OFFSET);
+ }
+
+
+ public static boolean hasCommitOffsetFlag(final int sysFlag) {
+ return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
+ }
+
+
+ public static boolean hasSuspendFlag(final int sysFlag) {
+ return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
+ }
+
+
+ public static boolean hasSubscriptionFlag(final int sysFlag) {
+ return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
+ }
+
+
+ public static boolean hasClassFilterFlag(final int sysFlag) {
+ return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
new file mode 100644
index 0000000..2761a0b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+ * @author manhong.yqd
+ */
+public class SubscriptionSysFlag {
+
+ private final static int FLAG_UNIT = 0x1 << 0;
+
+
+ public static int buildSysFlag(final boolean unit) {
+ int sysFlag = 0;
+
+ if (unit) {
+ sysFlag |= FLAG_UNIT;
+ }
+
+ return sysFlag;
+ }
+
+
+ public static int setUnitFlag(final int sysFlag) {
+ return sysFlag | FLAG_UNIT;
+ }
+
+
+ public static int clearUnitFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_UNIT);
+ }
+
+
+ public static boolean hasUnitFlag(final int sysFlag) {
+ return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+ }
+
+
+ public static void main(String[] args) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
new file mode 100644
index 0000000..b12108a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+
+ *
+ * @author manhong.yqd
+ *
+ */
+public class TopicSysFlag {
+
+ private final static int FLAG_UNIT = 0x1 << 0;
+
+ private final static int FLAG_UNIT_SUB = 0x1 << 1;
+
+
+ public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) {
+ int sysFlag = 0;
+
+ if (unit) {
+ sysFlag |= FLAG_UNIT;
+ }
+
+ if (hasUnitSub) {
+ sysFlag |= FLAG_UNIT_SUB;
+ }
+
+ return sysFlag;
+ }
+
+
+ public static int setUnitFlag(final int sysFlag) {
+ return sysFlag | FLAG_UNIT;
+ }
+
+
+ public static int clearUnitFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_UNIT);
+ }
+
+
+ public static boolean hasUnitFlag(final int sysFlag) {
+ return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+ }
+
+
+ public static int setUnitSubFlag(final int sysFlag) {
+ return sysFlag | FLAG_UNIT_SUB;
+ }
+
+
+ public static int clearUnitSubFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_UNIT_SUB);
+ }
+
+
+ public static boolean hasUnitSubFlag(final int sysFlag) {
+ return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB;
+ }
+
+
+ public static void main(String[] args) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
new file mode 100644
index 0000000..ab017f2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import io.netty.channel.Channel;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+public class ChannelUtil {
+ public static String getRemoteIp(Channel channel) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress();
+ if (inetSocketAddress == null) {
+ return "";
+ }
+ final InetAddress inetAddr = inetSocketAddress.getAddress();
+ return inetAddr != null ? inetAddr.getHostAddress() : inetSocketAddress.getHostName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
new file mode 100755
index 0000000..fcd002c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class HttpTinyClient {
+
+ static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
+ String encoding, long readTimeoutMs) throws IOException {
+ String encodedContent = encodingParams(paramValues, encoding);
+ url += (null == encodedContent) ? "" : ("?" + encodedContent);
+
+ HttpURLConnection conn = null;
+ try {
+ conn = (HttpURLConnection) new URL(url).openConnection();
+ conn.setRequestMethod("GET");
+ conn.setConnectTimeout((int) readTimeoutMs);
+ conn.setReadTimeout((int) readTimeoutMs);
+ setHeaders(conn, headers, encoding);
+
+ conn.connect();
+ int respCode = conn.getResponseCode();
+ String resp = null;
+
+ if (HttpURLConnection.HTTP_OK == respCode) {
+ resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+ } else {
+ resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+ }
+ return new HttpResult(respCode, resp);
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ }
+
+ static private String encodingParams(List<String> paramValues, String encoding)
+ throws UnsupportedEncodingException {
+ StringBuilder sb = new StringBuilder();
+ if (null == paramValues) {
+ return null;
+ }
+
+ for (Iterator<String> iter = paramValues.iterator(); iter.hasNext(); ) {
+ sb.append(iter.next()).append("=");
+ sb.append(URLEncoder.encode(iter.next(), encoding));
+ if (iter.hasNext()) {
+ sb.append("&");
+ }
+ }
+ return sb.toString();
+ }
+
+ static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
+ if (null != headers) {
+ for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) {
+ conn.addRequestProperty(iter.next(), iter.next());
+ }
+ }
+ conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+ conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
+
+
+ String ts = String.valueOf(System.currentTimeMillis());
+ conn.addRequestProperty("Metaq-Client-RequestTS", ts);
+ }
+
+ /**
+
+ *
+ * @param url
+ * @param headers
+
+ * @param paramValues
+
+ * @param encoding
+
+ * @param readTimeoutMs
+
+ *
+ * @return the http response of given http post request
+ *
+ * @throws java.io.IOException
+ */
+ static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
+ String encoding, long readTimeoutMs) throws IOException {
+ String encodedContent = encodingParams(paramValues, encoding);
+
+ HttpURLConnection conn = null;
+ try {
+ conn = (HttpURLConnection) new URL(url).openConnection();
+ conn.setRequestMethod("POST");
+ conn.setConnectTimeout(3000);
+ conn.setReadTimeout((int) readTimeoutMs);
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ setHeaders(conn, headers, encoding);
+
+ conn.getOutputStream().write(encodedContent.getBytes(MixAll.DEFAULT_CHARSET));
+
+ int respCode = conn.getResponseCode();
+ String resp = null;
+
+ if (HttpURLConnection.HTTP_OK == respCode) {
+ resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+ } else {
+ resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+ }
+ return new HttpResult(respCode, resp);
+ } finally {
+ if (null != conn) {
+ conn.disconnect();
+ }
+ }
+ }
+
+ static public class HttpResult {
+ final public int code;
+ final public String content;
+
+
+ public HttpResult(int code, String content) {
+ this.code = code;
+ this.content = content;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
new file mode 100644
index 0000000..3284759
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class IOTinyUtils {
+
+ static public String toString(InputStream input, String encoding) throws IOException {
+ return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader(
+ input, encoding));
+ }
+
+
+ static public String toString(Reader reader) throws IOException {
+ CharArrayWriter sw = new CharArrayWriter();
+ copy(reader, sw);
+ return sw.toString();
+ }
+
+
+ static public long copy(Reader input, Writer output) throws IOException {
+ char[] buffer = new char[1 << 12];
+ long count = 0;
+ for (int n = 0; (n = input.read(buffer)) >= 0; ) {
+ output.write(buffer, 0, n);
+ count += n;
+ }
+ return count;
+ }
+
+
+ /**
+
+ */
+ static public List<String> readLines(Reader input) throws IOException {
+ BufferedReader reader = toBufferedReader(input);
+ List<String> list = new ArrayList<String>();
+ String line = null;
+ for (;;) {
+ line = reader.readLine();
+ if (null != line) {
+ list.add(line);
+ } else {
+ break;
+ }
+ }
+ return list;
+ }
+
+
+ static private BufferedReader toBufferedReader(Reader reader) {
+ return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader);
+ }
+
+
+ static public void copyFile(String source, String target) throws IOException {
+ File sf = new File(source);
+ if (!sf.exists()) {
+ throw new IllegalArgumentException("source file does not exist.");
+ }
+ File tf = new File(target);
+ tf.getParentFile().mkdirs();
+ if (!tf.exists() && !tf.createNewFile()) {
+ throw new RuntimeException("failed to create target file.");
+ }
+
+ FileChannel sc = null;
+ FileChannel tc = null;
+ try {
+ tc = new FileOutputStream(tf).getChannel();
+ sc = new FileInputStream(sf).getChannel();
+ sc.transferTo(0, sc.size(), tc);
+ } finally {
+ if (null != sc) {
+ sc.close();
+ }
+ if (null != tc) {
+ tc.close();
+ }
+ }
+ }
+
+
+ public static void delete(File fileOrDir) throws IOException {
+ if (fileOrDir == null) {
+ return;
+ }
+
+ if (fileOrDir.isDirectory()) {
+ cleanDirectory(fileOrDir);
+ }
+
+ fileOrDir.delete();
+ }
+
+
+ /**
+
+ */
+ public static void cleanDirectory(File directory) throws IOException {
+ if (!directory.exists()) {
+ String message = directory + " does not exist";
+ throw new IllegalArgumentException(message);
+ }
+
+ if (!directory.isDirectory()) {
+ String message = directory + " is not a directory";
+ throw new IllegalArgumentException(message);
+ }
+
+ File[] files = directory.listFiles();
+ if (files == null) { // null if security restricted
+ throw new IOException("Failed to list contents of " + directory);
+ }
+
+ IOException exception = null;
+ for (File file : files) {
+ try {
+ delete(file);
+ } catch (IOException ioe) {
+ exception = ioe;
+ }
+ }
+
+ if (null != exception) {
+ throw exception;
+ }
+ }
+
+
+ public static void writeStringToFile(File file, String data, String encoding) throws IOException {
+ OutputStream os = null;
+ try {
+ os = new FileOutputStream(file);
+ os.write(data.getBytes(encoding));
+ } finally {
+ if (null != os) {
+ os.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java b/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
deleted file mode 100644
index 72e02d0..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common;
-
-import junit.framework.Assert;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.List;
-
-
-/**
- * @author lansheng.zj
- */
-public class MixAllTest {
-
- @Test
- public void test() throws Exception {
- List<String> localInetAddress = MixAll.getLocalInetAddress();
- String local = InetAddress.getLocalHost().getHostAddress();
- Assert.assertTrue(localInetAddress.contains("127.0.0.1"));
- Assert.assertTrue(localInetAddress.contains(local));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
deleted file mode 100644
index e6468b9..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common;
-
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import org.junit.Test;
-
-
-public class RemotingUtilTest {
- @Test
- public void test() throws Exception {
- String a = RemotingUtil.getLocalAddress();
- System.out.println(a);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java b/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
deleted file mode 100644
index 7764fcc..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common;
-
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.Properties;
-
-import static org.junit.Assert.assertTrue;
-
-
-public class UtilAllTest {
-
- @Test
- public void test_currentStackTrace() {
- System.out.println(UtilAll.currentStackTrace());
- }
-
-
- @Test
- public void test_a() {
- URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation();
- System.out.println(url);
- System.out.println(url.getPath());
- }
-
-
- @Test
- public void test_resetClassProperties() {
- DemoConfig demoConfig = new DemoConfig();
- MixAll.properties2Object(new Properties(), demoConfig);
- }
-
-
- @Test
- public void test_properties2String() {
- DemoConfig demoConfig = new DemoConfig();
- Properties properties = MixAll.object2Properties(demoConfig);
- System.out.println(MixAll.properties2String(properties));
- }
-
-
- @Test
- public void test_timeMillisToHumanString() {
- System.out.println(UtilAll.timeMillisToHumanString());
- }
-
-
- @Test
- public void test_isPropertiesEqual() {
- final Properties p1 = new Properties();
- final Properties p2 = new Properties();
-
- p1.setProperty("a", "1");
- p1.setProperty("b", "2");
-
- p2.setProperty("a", "1");
- p2.setProperty("b", "2");
- // p2.setProperty("c", "3");
-
- assertTrue(MixAll.isPropertiesEqual(p1, p2));
- }
-
-
- @Test
- public void test_getpid() {
- int pid = UtilAll.getPid();
-
- System.out.println("PID = " + pid);
- assertTrue(pid > 0);
- }
-
-
- @Test
- public void test_isBlank() {
- {
- boolean result = UtilAll.isBlank("Hello ");
- assertTrue(!result);
- }
-
- {
- boolean result = UtilAll.isBlank(" Hello");
- assertTrue(!result);
- }
-
- {
- boolean result = UtilAll.isBlank("He llo");
- assertTrue(!result);
- }
-
- {
- boolean result = UtilAll.isBlank(" ");
- assertTrue(result);
- }
-
- {
- boolean result = UtilAll.isBlank("Hello");
- assertTrue(!result);
- }
- }
-
- static class DemoConfig {
- private int demoWidth = 0;
- private int demoLength = 0;
- private boolean demoOK = false;
- private String demoName = "haha";
-
-
- public int getDemoWidth() {
- return demoWidth;
- }
-
-
- public void setDemoWidth(int demoWidth) {
- this.demoWidth = demoWidth;
- }
-
-
- public int getDemoLength() {
- return demoLength;
- }
-
-
- public void setDemoLength(int demoLength) {
- this.demoLength = demoLength;
- }
-
-
- public boolean isDemoOK() {
- return demoOK;
- }
-
-
- public void setDemoOK(boolean demoOK) {
- this.demoOK = demoOK;
- }
-
-
- public String getDemoName() {
- return demoName;
- }
-
-
- public void setDemoNfieldame(String demoName) {
- this.demoName = demoName;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
deleted file mode 100644
index e45873b..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.filter;
-
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.junit.Test;
-
-
-/**
- * @author shijia.wxr
- *
- */
-public class FilterAPITest {
-
- @Test
- public void testBuildSubscriptionData() throws Exception {
- SubscriptionData subscriptionData =
- FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
- System.out.println(subscriptionData);
- }
-
- @Test
- public void testSubscriptionData() throws Exception {
- SubscriptionData subscriptionData =
- FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
- subscriptionData.setFilterClassSource("java hello");
- String json = RemotingSerializable.toJson(subscriptionData, true);
- System.out.println(json);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java b/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
deleted file mode 100644
index 612df69..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.filter;
-
-import com.alibaba.rocketmq.common.filter.impl.Op;
-import com.alibaba.rocketmq.common.filter.impl.PolishExpr;
-import junit.framework.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-
-/**
- * @author lansheng.zj
- */
-public class PolishExprTest {
-
- private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8 && tag9";
- private PolishExpr polishExpr;
-
-
- public void init() {
- polishExpr = new PolishExpr();
- }
-
-
- @Test
- public void testReversePolish() {
- List<Op> antiPolishExpression = polishExpr.reversePolish(expression);
- System.out.println(antiPolishExpression);
- }
-
-
- @Test
- public void testReversePolish_Performance() {
- // prepare
- for (int i = 0; i < 100000; i++) {
- polishExpr.reversePolish(expression);
- }
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < 100000; i++) {
- polishExpr.reversePolish(expression);
- }
- long cost = System.currentTimeMillis() - start;
- System.out.println(cost);
- // System.out.println(cost / 100000F);
-
- Assert.assertTrue(cost < 500);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
deleted file mode 100644
index 32e3d98..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol;
-
-import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.junit.Test;
-
-
-public class ConsumeStatusTest {
-
- @Test
- public void decode_test() throws Exception {
- ConsumeStatus cs = new ConsumeStatus();
- cs.setConsumeFailedTPS(0L);
- String json = RemotingSerializable.toJson(cs, true);
- System.out.println(json);
- ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java b/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
deleted file mode 100644
index 749e7df..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol;
-
-/**
- * @author shijia.wxr
- */
-public class MQProtosHelperTest {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
new file mode 100644
index 0000000..6e7c17b
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.List;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class MixAllTest {
+
+ @Test
+ public void test() throws Exception {
+ List<String> localInetAddress = MixAll.getLocalInetAddress();
+ String local = InetAddress.getLocalHost().getHostAddress();
+ Assert.assertTrue(localInetAddress.contains("127.0.0.1"));
+ Assert.assertTrue(localInetAddress.contains(local));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
new file mode 100644
index 0000000..b7509b1
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.junit.Test;
+
+
+public class RemotingUtilTest {
+ @Test
+ public void test() throws Exception {
+ String a = RemotingUtil.getLocalAddress();
+ System.out.println(a);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
new file mode 100644
index 0000000..decd3d0
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class UtilAllTest {
+
+ @Test
+ public void test_currentStackTrace() {
+ System.out.println(UtilAll.currentStackTrace());
+ }
+
+
+ @Test
+ public void test_a() {
+ URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation();
+ System.out.println(url);
+ System.out.println(url.getPath());
+ }
+
+
+ @Test
+ public void test_resetClassProperties() {
+ DemoConfig demoConfig = new DemoConfig();
+ MixAll.properties2Object(new Properties(), demoConfig);
+ }
+
+
+ @Test
+ public void test_properties2String() {
+ DemoConfig demoConfig = new DemoConfig();
+ Properties properties = MixAll.object2Properties(demoConfig);
+ System.out.println(MixAll.properties2String(properties));
+ }
+
+
+ @Test
+ public void test_timeMillisToHumanString() {
+ System.out.println(UtilAll.timeMillisToHumanString());
+ }
+
+
+ @Test
+ public void test_isPropertiesEqual() {
+ final Properties p1 = new Properties();
+ final Properties p2 = new Properties();
+
+ p1.setProperty("a", "1");
+ p1.setProperty("b", "2");
+
+ p2.setProperty("a", "1");
+ p2.setProperty("b", "2");
+ // p2.setProperty("c", "3");
+
+ assertTrue(MixAll.isPropertiesEqual(p1, p2));
+ }
+
+
+ @Test
+ public void test_getpid() {
+ int pid = UtilAll.getPid();
+
+ System.out.println("PID = " + pid);
+ assertTrue(pid > 0);
+ }
+
+
+ @Test
+ public void test_isBlank() {
+ {
+ boolean result = UtilAll.isBlank("Hello ");
+ assertTrue(!result);
+ }
+
+ {
+ boolean result = UtilAll.isBlank(" Hello");
+ assertTrue(!result);
+ }
+
+ {
+ boolean result = UtilAll.isBlank("He llo");
+ assertTrue(!result);
+ }
+
+ {
+ boolean result = UtilAll.isBlank(" ");
+ assertTrue(result);
+ }
+
+ {
+ boolean result = UtilAll.isBlank("Hello");
+ assertTrue(!result);
+ }
+ }
+
+ static class DemoConfig {
+ private int demoWidth = 0;
+ private int demoLength = 0;
+ private boolean demoOK = false;
+ private String demoName = "haha";
+
+
+ public int getDemoWidth() {
+ return demoWidth;
+ }
+
+
+ public void setDemoWidth(int demoWidth) {
+ this.demoWidth = demoWidth;
+ }
+
+
+ public int getDemoLength() {
+ return demoLength;
+ }
+
+
+ public void setDemoLength(int demoLength) {
+ this.demoLength = demoLength;
+ }
+
+
+ public boolean isDemoOK() {
+ return demoOK;
+ }
+
+
+ public void setDemoOK(boolean demoOK) {
+ this.demoOK = demoOK;
+ }
+
+
+ public String getDemoName() {
+ return demoName;
+ }
+
+
+ public void setDemoNfieldame(String demoName) {
+ this.demoName = demoName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
new file mode 100644
index 0000000..b67c59a
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class FilterAPITest {
+
+ @Test
+ public void testBuildSubscriptionData() throws Exception {
+ SubscriptionData subscriptionData =
+ FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+ System.out.println(subscriptionData);
+ }
+
+ @Test
+ public void testSubscriptionData() throws Exception {
+ SubscriptionData subscriptionData =
+ FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+ subscriptionData.setFilterClassSource("java hello");
+ String json = RemotingSerializable.toJson(subscriptionData, true);
+ System.out.println(json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java b/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java
new file mode 100644
index 0000000..f2ba2a3
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+import org.apache.rocketmq.common.filter.impl.Op;
+import org.apache.rocketmq.common.filter.impl.PolishExpr;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class PolishExprTest {
+
+ private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8 && tag9";
+ private PolishExpr polishExpr;
+
+
+ public void init() {
+ polishExpr = new PolishExpr();
+ }
+
+
+ @Test
+ public void testReversePolish() {
+ List<Op> antiPolishExpression = polishExpr.reversePolish(expression);
+ System.out.println(antiPolishExpression);
+ }
+
+
+ @Test
+ public void testReversePolish_Performance() {
+ // prepare
+ for (int i = 0; i < 100000; i++) {
+ polishExpr.reversePolish(expression);
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 100000; i++) {
+ polishExpr.reversePolish(expression);
+ }
+ long cost = System.currentTimeMillis() - start;
+ System.out.println(cost);
+ // System.out.println(cost / 100000F);
+
+ Assert.assertTrue(cost < 500);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
new file mode 100644
index 0000000..79c6bbf
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol;
+
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+
+public class ConsumeStatusTest {
+
+ @Test
+ public void decode_test() throws Exception {
+ ConsumeStatus cs = new ConsumeStatus();
+ cs.setConsumeFailedTPS(0L);
+ String json = RemotingSerializable.toJson(cs, true);
+ System.out.println(json);
+ ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java
new file mode 100644
index 0000000..cd56627
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol;
+
+/**
+ * @author shijia.wxr
+ */
+public class MQProtosHelperTest {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/pom.xml
----------------------------------------------------------------------
diff --git a/example/pom.xml b/example/pom.xml
index 8e68a58..53aa6a6 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -18,7 +18,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>com.alibaba.rocketmq</groupId>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.0.0-SNAPSHOT</version>
</parent>