You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:33 UTC
[26/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
index da6401e..cf7cdcd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
@@ -20,7 +20,6 @@ public class SubscriptionSysFlag {
private final static int FLAG_UNIT = 0x1 << 0;
-
public static int buildSysFlag(final boolean unit) {
int sysFlag = 0;
@@ -31,22 +30,18 @@ public class SubscriptionSysFlag {
return sysFlag;
}
-
public static int setUnitFlag(final int sysFlag) {
return sysFlag | FLAG_UNIT;
}
-
public static int clearUnitFlag(final int sysFlag) {
return sysFlag & (~FLAG_UNIT);
}
-
public static boolean hasUnitFlag(final int sysFlag) {
return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
}
-
public static void main(String[] args) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
index 1d804db..2c45150 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
@@ -27,7 +27,6 @@ public class TopicSysFlag {
private final static int FLAG_UNIT_SUB = 0x1 << 1;
-
public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) {
int sysFlag = 0;
@@ -42,37 +41,30 @@ public class TopicSysFlag {
return sysFlag;
}
-
public static int setUnitFlag(final int sysFlag) {
return sysFlag | FLAG_UNIT;
}
-
public static int clearUnitFlag(final int sysFlag) {
return sysFlag & (~FLAG_UNIT);
}
-
public static boolean hasUnitFlag(final int sysFlag) {
return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
}
-
public static int setUnitSubFlag(final int sysFlag) {
return sysFlag | FLAG_UNIT_SUB;
}
-
public static int clearUnitSubFlag(final int sysFlag) {
return sysFlag & (~FLAG_UNIT_SUB);
}
-
public static boolean hasUnitSubFlag(final int sysFlag) {
return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB;
}
-
public static void main(String[] args) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
index ab017f2..dcb9187 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
@@ -6,25 +6,24 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common.utils;
import io.netty.channel.Channel;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
public class ChannelUtil {
public static String getRemoteIp(Channel channel) {
- InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress();
+ InetSocketAddress inetSocketAddress = (InetSocketAddress)channel.remoteAddress();
if (inetSocketAddress == null) {
return "";
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
index fcd002c..0cc3463 100755
--- a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
@@ -6,20 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common.utils;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
-
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
@@ -27,21 +24,22 @@ import java.net.URL;
import java.net.URLEncoder;
import java.util.Iterator;
import java.util.List;
-
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
public class HttpTinyClient {
static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
- String encoding, long readTimeoutMs) throws IOException {
+ String encoding, long readTimeoutMs) throws IOException {
String encodedContent = encodingParams(paramValues, encoding);
url += (null == encodedContent) ? "" : ("?" + encodedContent);
HttpURLConnection conn = null;
try {
- conn = (HttpURLConnection) new URL(url).openConnection();
+ conn = (HttpURLConnection)new URL(url).openConnection();
conn.setRequestMethod("GET");
- conn.setConnectTimeout((int) readTimeoutMs);
- conn.setReadTimeout((int) readTimeoutMs);
+ conn.setConnectTimeout((int)readTimeoutMs);
+ conn.setReadTimeout((int)readTimeoutMs);
setHeaders(conn, headers, encoding);
conn.connect();
@@ -62,7 +60,7 @@ public class HttpTinyClient {
}
static private String encodingParams(List<String> paramValues, String encoding)
- throws UnsupportedEncodingException {
+ throws UnsupportedEncodingException {
StringBuilder sb = new StringBuilder();
if (null == paramValues) {
return null;
@@ -87,7 +85,6 @@ public class HttpTinyClient {
conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
-
String ts = String.valueOf(System.currentTimeMillis());
conn.addRequestProperty("Metaq-Client-RequestTS", ts);
}
@@ -110,15 +107,15 @@ public class HttpTinyClient {
* @throws java.io.IOException
*/
static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
- String encoding, long readTimeoutMs) throws IOException {
+ String encoding, long readTimeoutMs) throws IOException {
String encodedContent = encodingParams(paramValues, encoding);
HttpURLConnection conn = null;
try {
- conn = (HttpURLConnection) new URL(url).openConnection();
+ conn = (HttpURLConnection)new URL(url).openConnection();
conn.setRequestMethod("POST");
conn.setConnectTimeout(3000);
- conn.setReadTimeout((int) readTimeoutMs);
+ conn.setReadTimeout((int)readTimeoutMs);
conn.setDoOutput(true);
conn.setDoInput(true);
setHeaders(conn, headers, encoding);
@@ -145,7 +142,6 @@ public class HttpTinyClient {
final public int code;
final public String content;
-
public HttpResult(int code, String content) {
this.code = code;
this.content = content;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
index a5152f8..b569c24 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
@@ -6,40 +6,46 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common.utils;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.CharArrayWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.io.Writer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
-
+import org.apache.rocketmq.remoting.common.RemotingHelper;
public class IOTinyUtils {
static public String toString(InputStream input, String encoding) throws IOException {
return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader(
- input, encoding));
+ input, encoding));
}
-
static public String toString(Reader reader) throws IOException {
CharArrayWriter sw = new CharArrayWriter();
copy(reader, sw);
return sw.toString();
}
-
static public long copy(Reader input, Writer output) throws IOException {
char[] buffer = new char[1 << 12];
long count = 0;
@@ -50,7 +56,6 @@ public class IOTinyUtils {
return count;
}
-
/**
*/
@@ -58,7 +63,7 @@ public class IOTinyUtils {
BufferedReader reader = toBufferedReader(input);
List<String> list = new ArrayList<String>();
String line = null;
- for (;;) {
+ for (; ; ) {
line = reader.readLine();
if (null != line) {
list.add(line);
@@ -69,12 +74,10 @@ public class IOTinyUtils {
return list;
}
-
static private BufferedReader toBufferedReader(Reader reader) {
- return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader);
+ return reader instanceof BufferedReader ? (BufferedReader)reader : new BufferedReader(reader);
}
-
static public void copyFile(String source, String target) throws IOException {
File sf = new File(source);
if (!sf.exists()) {
@@ -102,7 +105,6 @@ public class IOTinyUtils {
}
}
-
public static void delete(File fileOrDir) throws IOException {
if (fileOrDir == null) {
return;
@@ -115,7 +117,6 @@ public class IOTinyUtils {
fileOrDir.delete();
}
-
/**
*/
@@ -149,7 +150,6 @@ public class IOTinyUtils {
}
}
-
public static void writeStringToFile(File file, String data, String encoding) throws IOException {
OutputStream os = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
index 0006f74..3205c64 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
@@ -6,23 +6,21 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common;
-import junit.framework.Assert;
-import org.junit.Test;
-
import java.net.InetAddress;
import java.util.List;
-
+import junit.framework.Assert;
+import org.junit.Test;
public class MixAllTest {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
index b7509b1..9211d37 100644
--- a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
@@ -6,20 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.junit.Test;
-
public class RemotingUtilTest {
@Test
public void test() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index decd3d0..b21d65b 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common;
-import org.junit.Test;
-
import java.net.URL;
import java.util.Properties;
+import org.junit.Test;
import static org.junit.Assert.assertTrue;
-
public class UtilAllTest {
@Test
@@ -32,7 +30,6 @@ public class UtilAllTest {
System.out.println(UtilAll.currentStackTrace());
}
-
@Test
public void test_a() {
URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation();
@@ -40,14 +37,12 @@ public class UtilAllTest {
System.out.println(url.getPath());
}
-
@Test
public void test_resetClassProperties() {
DemoConfig demoConfig = new DemoConfig();
MixAll.properties2Object(new Properties(), demoConfig);
}
-
@Test
public void test_properties2String() {
DemoConfig demoConfig = new DemoConfig();
@@ -55,13 +50,11 @@ public class UtilAllTest {
System.out.println(MixAll.properties2String(properties));
}
-
@Test
public void test_timeMillisToHumanString() {
System.out.println(UtilAll.timeMillisToHumanString());
}
-
@Test
public void test_isPropertiesEqual() {
final Properties p1 = new Properties();
@@ -77,7 +70,6 @@ public class UtilAllTest {
assertTrue(MixAll.isPropertiesEqual(p1, p2));
}
-
@Test
public void test_getpid() {
int pid = UtilAll.getPid();
@@ -86,7 +78,6 @@ public class UtilAllTest {
assertTrue(pid > 0);
}
-
@Test
public void test_isBlank() {
{
@@ -121,42 +112,34 @@ public class UtilAllTest {
private boolean demoOK = false;
private String demoName = "haha";
-
public int getDemoWidth() {
return demoWidth;
}
-
public void setDemoWidth(int demoWidth) {
this.demoWidth = demoWidth;
}
-
public int getDemoLength() {
return demoLength;
}
-
public void setDemoLength(int demoLength) {
this.demoLength = demoLength;
}
-
public boolean isDemoOK() {
return demoOK;
}
-
public void setDemoOK(boolean demoOK) {
this.demoOK = demoOK;
}
-
public String getDemoName() {
return demoName;
}
-
public void setDemoNfieldame(String demoName) {
this.demoName = demoName;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
index cf26efd..5a97db9 100644
--- a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common.filter;
@@ -21,7 +21,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
-
/**
*
*/
@@ -30,14 +29,14 @@ public class FilterAPITest {
@Test
public void testBuildSubscriptionData() throws Exception {
SubscriptionData subscriptionData =
- FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+ FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
System.out.println(subscriptionData);
}
@Test
public void testSubscriptionData() throws Exception {
SubscriptionData subscriptionData =
- FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+ FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
subscriptionData.setFilterClassSource("java hello");
String json = RemotingSerializable.toJson(subscriptionData, true);
System.out.println(json);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
index 79c6bbf..b511537 100644
--- a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.common.protocol;
@@ -21,7 +21,6 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
-
public class ConsumeStatusTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-a-s.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-async/broker-a-s.properties b/conf/2m-2s-async/broker-a-s.properties
index a4401f8..60fddf9 100644
--- a/conf/2m-2s-async/broker-a-s.properties
+++ b/conf/2m-2s-async/broker-a-s.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-a.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-async/broker-a.properties b/conf/2m-2s-async/broker-a.properties
index 6ca12f1..367f974 100644
--- a/conf/2m-2s-async/broker-a.properties
+++ b/conf/2m-2s-async/broker-a.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-b-s.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-async/broker-b-s.properties b/conf/2m-2s-async/broker-b-s.properties
index 51f8daf..bcd5a16 100644
--- a/conf/2m-2s-async/broker-b-s.properties
+++ b/conf/2m-2s-async/broker-b-s.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-b.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-async/broker-b.properties b/conf/2m-2s-async/broker-b.properties
index f7f3791..33b68fe 100644
--- a/conf/2m-2s-async/broker-b.properties
+++ b/conf/2m-2s-async/broker-b.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-a-s.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-sync/broker-a-s.properties b/conf/2m-2s-sync/broker-a-s.properties
index a4401f8..60fddf9 100644
--- a/conf/2m-2s-sync/broker-a-s.properties
+++ b/conf/2m-2s-sync/broker-a-s.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-a.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-sync/broker-a.properties b/conf/2m-2s-sync/broker-a.properties
index 135552d..b916f88 100644
--- a/conf/2m-2s-sync/broker-a.properties
+++ b/conf/2m-2s-sync/broker-a.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-b-s.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-sync/broker-b-s.properties b/conf/2m-2s-sync/broker-b-s.properties
index 51f8daf..bcd5a16 100644
--- a/conf/2m-2s-sync/broker-b-s.properties
+++ b/conf/2m-2s-sync/broker-b-s.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-b.properties
----------------------------------------------------------------------
diff --git a/conf/2m-2s-sync/broker-b.properties b/conf/2m-2s-sync/broker-b.properties
index 97162a7..44fcea7 100644
--- a/conf/2m-2s-sync/broker-b.properties
+++ b/conf/2m-2s-sync/broker-b.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-noslave/broker-a.properties
----------------------------------------------------------------------
diff --git a/conf/2m-noslave/broker-a.properties b/conf/2m-noslave/broker-a.properties
index 6ca12f1..367f974 100644
--- a/conf/2m-noslave/broker-a.properties
+++ b/conf/2m-noslave/broker-a.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-noslave/broker-b.properties
----------------------------------------------------------------------
diff --git a/conf/2m-noslave/broker-b.properties b/conf/2m-noslave/broker-b.properties
index f7f3791..33b68fe 100644
--- a/conf/2m-noslave/broker-b.properties
+++ b/conf/2m-noslave/broker-b.properties
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/broker.conf
----------------------------------------------------------------------
diff --git a/conf/broker.conf b/conf/broker.conf
index 6ca12f1..0c0b28b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-brokerClusterName=DefaultCluster
-brokerName=broker-a
-brokerId=0
-deleteWhen=04
-fileReservedTime=48
-brokerRole=ASYNC_MASTER
-flushDiskType=ASYNC_FLUSH
+brokerClusterName = DefaultCluster
+brokerName = broker-a
+brokerId = 0
+deleteWhen = 04
+fileReservedTime = 48
+brokerRole = ASYNC_MASTER
+flushDiskType = ASYNC_FLUSH
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_broker.xml
----------------------------------------------------------------------
diff --git a/conf/logback_broker.xml b/conf/logback_broker.xml
index 49e9d12..7a8f83b 100644
--- a/conf/logback_broker.xml
+++ b/conf/logback_broker.xml
@@ -28,7 +28,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -48,7 +48,7 @@
<maxIndex>20</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>128MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -71,7 +71,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -94,7 +94,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -117,7 +117,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>128MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -140,7 +140,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -163,7 +163,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -187,7 +187,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -210,7 +210,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -233,7 +233,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -253,7 +253,7 @@
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>500MB</maxFileSize>
</triggeringPolicy>
</appender>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_filtersrv.xml
----------------------------------------------------------------------
diff --git a/conf/logback_filtersrv.xml b/conf/logback_filtersrv.xml
index 8de4e08..9668795 100644
--- a/conf/logback_filtersrv.xml
+++ b/conf/logback_filtersrv.xml
@@ -28,7 +28,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -48,7 +48,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_namesrv.xml
----------------------------------------------------------------------
diff --git a/conf/logback_namesrv.xml b/conf/logback_namesrv.xml
index 7a60c76..45ccf4f 100644
--- a/conf/logback_namesrv.xml
+++ b/conf/logback_namesrv.xml
@@ -28,7 +28,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -48,7 +48,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_tools.xml
----------------------------------------------------------------------
diff --git a/conf/logback_tools.xml b/conf/logback_tools.xml
index addf211..35d33a5 100644
--- a/conf/logback_tools.xml
+++ b/conf/logback_tools.xml
@@ -28,7 +28,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
@@ -48,7 +48,7 @@
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
- class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/pom.xml
----------------------------------------------------------------------
diff --git a/example/pom.xml b/example/pom.xml
index 53aa6a6..efb1aa5 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 1fbb8a4..f810f5a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -17,6 +17,15 @@
package org.apache.rocketmq.example.benchmark;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -24,16 +33,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
public class Consumer {
@@ -77,17 +76,16 @@ public class Consumer {
Long[] end = snapshotList.getLast();
final long consumeTps =
- (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
- final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
- final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
+ (long)(((end[1] - begin[1]) / (double)(end[0] - begin[0])) * 1000L);
+ final double averageB2CRT = (end[2] - begin[2]) / (double)(end[1] - begin[1]);
+ final double averageS2CRT = (end[3] - begin[3]) / (double)(end[1] - begin[1]);
System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
- consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
+ consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
);
}
}
-
@Override
public void run() {
try {
@@ -106,7 +104,7 @@ public class Consumer {
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
long now = System.currentTimeMillis();
@@ -140,7 +138,6 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
-
opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false");
opt.setRequired(false);
options.addOption(opt);
@@ -148,7 +145,6 @@ public class Consumer {
return options;
}
-
public static void compareAndSetMax(final AtomicLong target, final long value) {
long prev = target.get();
while (value > prev) {
@@ -161,7 +157,6 @@ public class Consumer {
}
}
-
class StatsBenchmarkConsumer {
private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L);
@@ -173,41 +168,35 @@ class StatsBenchmarkConsumer {
private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
-
public Long[] createSnapshot() {
- Long[] snap = new Long[]{
- System.currentTimeMillis(),
- this.receiveMessageTotalCount.get(),
- this.born2ConsumerTotalRT.get(),
- this.store2ConsumerTotalRT.get(),
- this.born2ConsumerMaxRT.get(),
- this.store2ConsumerMaxRT.get(),
+ Long[] snap = new Long[] {
+ System.currentTimeMillis(),
+ this.receiveMessageTotalCount.get(),
+ this.born2ConsumerTotalRT.get(),
+ this.store2ConsumerTotalRT.get(),
+ this.born2ConsumerMaxRT.get(),
+ this.store2ConsumerMaxRT.get(),
};
return snap;
}
-
public AtomicLong getReceiveMessageTotalCount() {
return receiveMessageTotalCount;
}
-
public AtomicLong getBorn2ConsumerTotalRT() {
return born2ConsumerTotalRT;
}
-
public AtomicLong getStore2ConsumerTotalRT() {
return store2ConsumerTotalRT;
}
-
public AtomicLong getBorn2ConsumerMaxRT() {
return born2ConsumerMaxRT;
}
-
public AtomicLong getStore2ConsumerMaxRT() {
return store2ConsumerMaxRT;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 3b13f94..88e9a4f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -16,6 +16,17 @@
*/
package org.apache.rocketmq.example.benchmark;
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
@@ -24,20 +35,8 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
-import java.io.UnsupportedEncodingException;
-import java.util.LinkedList;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
@@ -82,15 +81,14 @@ public class Producer {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
- final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
- final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
+ final long sendTps = (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
+ final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+ sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
}
}
-
@Override
public void run() {
try {
@@ -202,7 +200,6 @@ public class Producer {
}
}
-
class StatsBenchmarkProducer {
private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
@@ -216,46 +213,39 @@ class StatsBenchmarkProducer {
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
-
public Long[] createSnapshot() {
- Long[] snap = new Long[]{
- System.currentTimeMillis(),
- this.sendRequestSuccessCount.get(),
- this.sendRequestFailedCount.get(),
- this.receiveResponseSuccessCount.get(),
- this.receiveResponseFailedCount.get(),
- this.sendMessageSuccessTimeTotal.get(),
+ Long[] snap = new Long[] {
+ System.currentTimeMillis(),
+ this.sendRequestSuccessCount.get(),
+ this.sendRequestFailedCount.get(),
+ this.receiveResponseSuccessCount.get(),
+ this.receiveResponseFailedCount.get(),
+ this.sendMessageSuccessTimeTotal.get(),
};
return snap;
}
-
public AtomicLong getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
-
public AtomicLong getSendRequestFailedCount() {
return sendRequestFailedCount;
}
-
public AtomicLong getReceiveResponseSuccessCount() {
return receiveResponseSuccessCount;
}
-
public AtomicLong getReceiveResponseFailedCount() {
return receiveResponseFailedCount;
}
-
public AtomicLong getSendMessageSuccessTimeTotal() {
return sendMessageSuccessTimeTotal;
}
-
public AtomicLong getSendMessageMaxRT() {
return sendMessageMaxRT;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 43f159b..ce4b1ab 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -17,12 +17,6 @@
package org.apache.rocketmq.example.benchmark;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.client.producer.*;
-
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
import java.util.Timer;
@@ -30,6 +24,15 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TransactionProducer {
private static int threadCount;
@@ -37,7 +40,6 @@ public class TransactionProducer {
private static boolean ischeck;
private static boolean ischeckffalse;
-
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
@@ -71,16 +73,15 @@ public class TransactionProducer {
Long[] end = snapshotList.getLast();
final long sendTps =
- (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
- final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
+ (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
+ final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
System.out.printf(
- "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]);
+ "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
+ sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]);
}
}
-
@Override
public void run() {
try {
@@ -92,7 +93,7 @@ public class TransactionProducer {
}, 10000, 10000);
final TransactionCheckListener transactionCheckListener =
- new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
+ new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionCheckListener(transactionCheckListener);
@@ -110,7 +111,7 @@ public class TransactionProducer {
// Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult =
- producer.sendMessageInTransaction(msg, tranExecuter, null);
+ producer.sendMessageInTransaction(msg, tranExecuter, null);
if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
@@ -121,8 +122,8 @@ public class TransactionProducer {
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
while (currentRT > prevMaxRT) {
boolean updated =
- statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
- currentRT);
+ statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
+ currentRT);
if (updated)
break;
@@ -137,7 +138,6 @@ public class TransactionProducer {
}
}
-
private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
Message msg = new Message();
msg.setTopic("BenchmarkTest");
@@ -153,17 +153,14 @@ public class TransactionProducer {
}
}
-
class TransactionExecuterBImpl implements LocalTransactionExecuter {
private boolean ischeck;
-
public TransactionExecuterBImpl(boolean ischeck) {
this.ischeck = ischeck;
}
-
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
if (ischeck) {
@@ -173,19 +170,16 @@ class TransactionExecuterBImpl implements LocalTransactionExecuter {
}
}
-
class TransactionCheckListenerBImpl implements TransactionCheckListener {
private boolean ischeckffalse;
private StatsBenchmarkTProducer statsBenchmarkTProducer;
-
public TransactionCheckListenerBImpl(boolean ischeckffalse,
- StatsBenchmarkTProducer statsBenchmarkTProducer) {
+ StatsBenchmarkTProducer statsBenchmarkTProducer) {
this.ischeckffalse = ischeckffalse;
this.statsBenchmarkTProducer = statsBenchmarkTProducer;
}
-
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
@@ -198,7 +192,6 @@ class TransactionCheckListenerBImpl implements TransactionCheckListener {
}
}
-
class StatsBenchmarkTProducer {
private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
@@ -214,51 +207,43 @@ class StatsBenchmarkTProducer {
private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L);
-
public Long[] createSnapshot() {
- Long[] snap = new Long[]{
- System.currentTimeMillis(),
- this.sendRequestSuccessCount.get(),
- this.sendRequestFailedCount.get(),
- this.receiveResponseSuccessCount.get(),
- this.receiveResponseFailedCount.get(),
- this.sendMessageSuccessTimeTotal.get(),
- this.checkRequestSuccessCount.get()};
+ Long[] snap = new Long[] {
+ System.currentTimeMillis(),
+ this.sendRequestSuccessCount.get(),
+ this.sendRequestFailedCount.get(),
+ this.receiveResponseSuccessCount.get(),
+ this.receiveResponseFailedCount.get(),
+ this.sendMessageSuccessTimeTotal.get(),
+ this.checkRequestSuccessCount.get()};
return snap;
}
-
public AtomicLong getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
-
public AtomicLong getSendRequestFailedCount() {
return sendRequestFailedCount;
}
-
public AtomicLong getReceiveResponseSuccessCount() {
return receiveResponseSuccessCount;
}
-
public AtomicLong getReceiveResponseFailedCount() {
return receiveResponseFailedCount;
}
-
public AtomicLong getSendMessageSuccessTimeTotal() {
return sendMessageSuccessTimeTotal;
}
-
public AtomicLong getSendMessageMaxRT() {
return sendMessageMaxRT;
}
-
public AtomicLong getCheckRequestSuccessCount() {
return checkRequestSuccessCount;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
index aa62a1e..6301b3b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.broadcast;
+import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -25,8 +26,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import java.util.List;
-
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
@@ -42,7 +41,7 @@ public class PushConsumer {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
index d0a41f1..8d0fbe4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.filter;
+import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -24,9 +25,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
-
-
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
@@ -34,13 +32,13 @@ public class Consumer {
String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
consumer.subscribe("TopicFilter7", "org.apache.rocketmq.example.filter.MessageFilterImpl",
- filterCode);
+ filterCode);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
index d58c28d..a2dba6c 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
@@ -30,9 +30,9 @@ public class Producer {
try {
for (int i = 0; i < 6000000; i++) {
Message msg = new Message("TopicFilter7",
- "TagA",
- "OrderID001",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ "TagA",
+ "OrderID001",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("SequenceId", String.valueOf(i));
SendResult sendResult = producer.send(msg);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
index a6a3aca..ec7d6ef 100644
--- a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
@@ -16,22 +16,20 @@
*/
package org.apache.rocketmq.example.operation;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
@@ -51,10 +49,9 @@ public class Consumer {
consumer.registerMessageListener(new MessageListenerConcurrently() {
AtomicLong consumeTimes = new AtomicLong(0);
-
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
long currentTimes = this.consumeTimes.incrementAndGet();
System.out.printf("%-8d %s%n", currentTimes, msgs);
if (Boolean.parseBoolean(returnFailedHalf)) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
index 54e256b..663acd0 100644
--- a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
@@ -6,22 +6,27 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.example.operation;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.commons.cli.*;
public class Producer {
@@ -42,10 +47,10 @@ public class Producer {
for (int i = 0; i < Integer.parseInt(msgCount); i++) {
try {
Message msg = new Message(
- topic,
- tags,
- keys,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+ topic,
+ tags,
+ keys,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%-8d %s%n", i, sendResult);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
index 7ddfbf7..0a25402 100644
--- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.example.ordermessage;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
@@ -24,10 +26,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-
public class Consumer {
public static void main(String[] args) throws MQClientException {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
index 84c1da4..7abbb5a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.example.ordermessage;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -27,25 +29,22 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
- String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+ String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
- new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+ new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- Integer id = (Integer) arg;
+ Integer id = (Integer)arg;
int index = id % mqs.size();
return mqs.get(index);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index 43566f0..513c269 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.quickstart;
+import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -24,8 +25,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
-
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
@@ -39,7 +38,7 @@ public class Consumer {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index f6bd5df..a74d9df 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -32,8 +32,8 @@ public class Producer {
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest",
- "TagA",
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+ "TagA",
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
index 68dbb67..d4d9975 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.simple;
+import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
@@ -23,9 +24,6 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import java.io.UnsupportedEncodingException;
-
-
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
@@ -37,9 +35,9 @@ public class AsyncProducer {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
- "TagA",
- "OrderID188",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java
index 2b4ce23..54bf54f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java
@@ -6,26 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.example.simple;
-import org.apache.rocketmq.common.message.MessageExt;
-
import java.util.TreeMap;
-
+import org.apache.rocketmq.common.message.MessageExt;
public class CachedQueue {
private final TreeMap<Long, MessageExt> msgCachedTable = new TreeMap<Long, MessageExt>();
-
public TreeMap<Long, MessageExt> getMsgCachedTable() {
return msgCachedTable;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index b035d57..590bcee 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -22,7 +22,6 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
@@ -34,9 +33,9 @@ public class Producer {
try {
{
Message msg = new Message("TopicTest",
- "TagA",
- "OrderID188",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index 8c9ba15..c468f3a 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -16,19 +16,17 @@
*/
package org.apache.rocketmq.example.simple;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
-
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
@@ -41,7 +39,7 @@ public class PullConsumer {
while (true) {
try {
PullResult pullResult =
- consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
+ consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
index d38d679..b6bc8d2 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
@@ -26,7 +26,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
@@ -59,7 +58,6 @@ public class PullScheduleService {
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
-
context.setPullNextDelayTimeMillis(100);
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
index 5929aff..78bb922 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.simple;
+import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -24,9 +25,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
-
-
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {