You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:29 UTC
[22/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
index 125fbd3..bdb02c6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
@@ -6,27 +6,27 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
public enum LanguageCode {
- JAVA((byte) 0),
- CPP((byte) 1),
- DOTNET((byte) 2),
- PYTHON((byte) 3),
- DELPHI((byte) 4),
- ERLANG((byte) 5),
- RUBY((byte) 6),
- OTHER((byte) 7),
- HTTP((byte) 8);
+ JAVA((byte)0),
+ CPP((byte)1),
+ DOTNET((byte)2),
+ PYTHON((byte)3),
+ DELPHI((byte)4),
+ ERLANG((byte)5),
+ RUBY((byte)6),
+ OTHER((byte)7),
+ HTTP((byte)8);
private byte code;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 60dd498..6b253dc 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -6,24 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@@ -31,22 +24,26 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
+ public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
private static final int RPC_ONEWAY = 1; // 0, RPC
-
private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
- new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
+ new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
+ // 1, Oneway
// 1, RESPONSE_COMMAND
private static final Map<Field, Annotation> NOT_NULL_ANNOTATION_CACHE = new HashMap<Field, Annotation>();
- // 1, Oneway
-
private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
@@ -56,7 +53,6 @@ public class RemotingCommand {
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
- public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);
@@ -93,7 +89,6 @@ public class RemotingCommand {
*/
private transient byte[] body;
-
protected RemotingCommand() {
}
@@ -148,11 +143,6 @@ public class RemotingCommand {
return cmd;
}
- public void markResponseType() {
- int bits = 1 << RPC_TYPE;
- this.flag |= bits;
- }
-
public static RemotingCommand createResponseCommand(int code, String remark) {
return createResponseCommand(code, remark, null);
}
@@ -205,7 +195,7 @@ public class RemotingCommand {
}
public static SerializeType getProtocolType(int source) {
- return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
+ return SerializeType.valueOf((byte)((source >> 24) & 0xFF));
}
public static int createNewRequestId() {
@@ -229,6 +219,21 @@ public class RemotingCommand {
return true;
}
+ public static byte[] markProtocolType(int source, SerializeType type) {
+ byte[] result = new byte[4];
+
+ result[0] = type.getCode();
+ result[1] = (byte)((source >> 16) & 0xFF);
+ result[2] = (byte)((source >> 8) & 0xFF);
+ result[3] = (byte)(source & 0xFF);
+ return result;
+ }
+
+ public void markResponseType() {
+ int bits = 1 << RPC_TYPE;
+ this.flag |= bits;
+ }
+
public CommandCustomHeader readCustomHeader() {
return customHeader;
}
@@ -376,16 +381,6 @@ public class RemotingCommand {
}
}
- public static byte[] markProtocolType(int source, SerializeType type) {
- byte[] result = new byte[4];
-
- result[0] = type.getCode();
- result[1] = (byte) ((source >> 16) & 0xFF);
- result[2] = (byte) ((source >> 8) & 0xFF);
- result[3] = (byte) (source & 0xFF);
- return result;
- }
-
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
@@ -550,16 +545,14 @@ public class RemotingCommand {
@Override
public String toString() {
return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
- + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
- + serializeTypeCurrentRPC + "]";
+ + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ + serializeTypeCurrentRPC + "]";
}
-
public SerializeType getSerializeTypeCurrentRPC() {
return serializeTypeCurrentRPC;
}
-
public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
index 3adf06f..de4a5c9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
index e543ce1..8a5d76e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -6,21 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.JSON;
-
import java.nio.charset.Charset;
-
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java
index e92bc49..f2836fe 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
index 6b0d825..64b37db 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
@@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-
/**
*
*/
@@ -52,11 +51,11 @@ public class RocketMQSerializable {
// ################### content
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
- headerBuffer.putShort((short) cmd.getCode());
+ headerBuffer.putShort((short)cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
- headerBuffer.putShort((short) cmd.getVersion());
+ headerBuffer.putShort((short)cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
@@ -92,10 +91,10 @@ public class RocketMQSerializable {
Map.Entry<String, String> entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
kvLength =
- // keySize + Key
- 2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length
- // valSize + val
- + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length;
+ // keySize + Key
+ 2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length
+ // valSize + val
+ + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length;
totalLength += kvLength;
}
}
@@ -110,7 +109,7 @@ public class RocketMQSerializable {
key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8);
val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8);
- content.putShort((short) key.length);
+ content.putShort((short)key.length);
content.put(key);
content.putInt(val.length);
@@ -124,18 +123,18 @@ public class RocketMQSerializable {
private static int calTotalLen(int remark, int ext) {
// int code(~32767)
int length = 2
- // LanguageCode language
- + 1
- // int version(~32767)
- + 2
- // int opaque
- + 4
- // int flag
- + 4
- // String remark
- + 4 + remark
- // HashMap<String, String> extFields
- + 4 + ext;
+ // LanguageCode language
+ + 1
+ // int version(~32767)
+ + 2
+ // int opaque
+ + 4
+ // int flag
+ + 4
+ // String remark
+ + 4 + remark
+ // HashMap<String, String> extFields
+ + 4 + ext;
return length;
}
@@ -192,12 +191,11 @@ public class RocketMQSerializable {
byteBuffer.get(valContent);
map.put(new String(keyContent, RemotingSerializable.CHARSET_UTF8), new String(valContent,
- RemotingSerializable.CHARSET_UTF8));
+ RemotingSerializable.CHARSET_UTF8));
}
return map;
}
-
public static boolean isBlank(String str) {
int strLen;
if (str == null || (strLen = str.length()) == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
index cebd48f..6bfd42c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
@@ -6,20 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
public enum SerializeType {
- JSON((byte) 0),
- ROCKETMQ((byte) 1);
+ JSON((byte)0),
+ ROCKETMQ((byte)1);
private byte code;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java
----------------------------------------------------------------------
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java
index 3b73e46..984ecd1 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: MixTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -22,7 +24,6 @@ package org.apache.rocketmq.remoting;
import org.junit.Test;
-
public class MixTest {
@Test
public void test_extFieldsValue() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java
----------------------------------------------------------------------
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java
index 15330bc..15a9aa3 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -20,47 +22,32 @@
*/
package org.apache.rocketmq.remoting;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.Executors;
import org.apache.rocketmq.remoting.annotation.CFNullable;
-import org.apache.rocketmq.remoting.exception.*;
-import org.apache.rocketmq.remoting.netty.*;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
import org.junit.Test;
-import java.util.concurrent.Executors;
-
import static org.junit.Assert.assertTrue;
-
public class NettyRPCTest {
- @Test
- public void test_RPC_Sync() throws InterruptedException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException {
- RemotingServer server = createRemotingServer();
- RemotingClient client = createRemotingClient();
-
- for (int i = 0; i < 100; i++) {
- TestRequestHeader requestHeader = new TestRequestHeader();
- requestHeader.setCount(i);
- requestHeader.setMessageTitle("HelloMessageTitle");
- RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
- RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000);
- System.out.println("invoke result = " + response);
- assertTrue(response != null);
- }
-
- client.shutdown();
- server.shutdown();
- System.out.println("-----------------------------------------------------------------");
- }
-
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
private int i = 0;
-
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
System.out.println("processRequest=" + request + " " + (i++));
@@ -85,8 +72,29 @@ public class NettyRPCTest {
}
@Test
+ public void test_RPC_Sync() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ RemotingServer server = createRemotingServer();
+ RemotingClient client = createRemotingClient();
+
+ for (int i = 0; i < 100; i++) {
+ TestRequestHeader requestHeader = new TestRequestHeader();
+ requestHeader.setCount(i);
+ requestHeader.setMessageTitle("HelloMessageTitle");
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
+ RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000);
+ System.out.println("invoke result = " + response);
+ assertTrue(response != null);
+ }
+
+ client.shutdown();
+ server.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+ @Test
public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException,
- RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
+ RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
RemotingServer server = createRemotingServer();
RemotingClient client = createRemotingClient();
@@ -101,10 +109,9 @@ public class NettyRPCTest {
System.out.println("-----------------------------------------------------------------");
}
-
@Test
public void test_RPC_Async() throws InterruptedException, RemotingConnectException,
- RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
+ RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
RemotingServer server = createRemotingServer();
RemotingClient client = createRemotingClient();
@@ -126,10 +133,9 @@ public class NettyRPCTest {
System.out.println("-----------------------------------------------------------------");
}
-
@Test
public void test_server_call_client() throws InterruptedException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException {
+ RemotingSendRequestException, RemotingTimeoutException {
final RemotingServer server = createRemotingServer();
final RemotingClient client = createRemotingClient();
@@ -183,7 +189,6 @@ public class NettyRPCTest {
}
-
class TestRequestHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
@@ -191,33 +196,27 @@ class TestRequestHeader implements CommandCustomHeader {
@CFNullable
private String messageTitle;
-
@Override
public void checkFields() throws RemotingCommandException {
}
-
public Integer getCount() {
return count;
}
-
public void setCount(Integer count) {
this.count = count;
}
-
public String getMessageTitle() {
return messageTitle;
}
-
public void setMessageTitle(String messageTitle) {
this.messageTitle = messageTitle;
}
}
-
class TestResponseHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
@@ -246,5 +245,4 @@ class TestResponseHeader implements CommandCustomHeader {
this.messageTitle = messageTitle;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java
----------------------------------------------------------------------
diff --git a/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java b/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java
index 3c27697..16aa9b1 100644
--- a/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java
+++ b/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/**
@@ -22,7 +22,6 @@ package org.apache.rocketmq.subclass;
import org.junit.Test;
-
public class TestSubClassAuto {
@Test
public void test_sub() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/srvutil/pom.xml
----------------------------------------------------------------------
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 0873ee8..af884cb 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
----------------------------------------------------------------------
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
index 0a25dd1..1774150 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
@@ -6,19 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.srvutil;
-import org.apache.commons.cli.*;
-
import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
public class ServerUtil {
@@ -28,17 +32,16 @@ public class ServerUtil {
options.addOption(opt);
opt =
- new Option("n", "namesrvAddr", true,
- "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
+ new Option("n", "namesrvAddr", true,
+ "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
opt.setRequired(false);
options.addOption(opt);
return options;
}
-
public static CommandLine parseCmdLine(final String appName, String[] args, Options options,
- CommandLineParser parser) {
+ CommandLineParser parser) {
HelpFormatter hf = new HelpFormatter();
hf.setWidth(110);
CommandLine commandLine = null;
@@ -55,14 +58,12 @@ public class ServerUtil {
return commandLine;
}
-
public static void printCommandLineHelp(final String appName, final Options options) {
HelpFormatter hf = new HelpFormatter();
hf.setWidth(110);
hf.printHelp(appName, options, true);
}
-
public static Properties commandLine2Properties(final CommandLine commandLine) {
Properties properties = new Properties();
Option[] opts = commandLine.getOptions();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/pom.xml
----------------------------------------------------------------------
diff --git a/store/pom.xml b/store/pom.xml
index 10b13b9..29be589 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index 94362ea..27b957f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.store;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.store.config.BrokerRole;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.util.ServiceLoader;
@@ -30,33 +23,35 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Create MappedFile in advance
- *
*/
public class AllocateMappedFileService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int waitTimeOut = 1000 * 5;
private ConcurrentHashMap<String, AllocateRequest> requestTable =
- new ConcurrentHashMap<String, AllocateRequest>();
+ new ConcurrentHashMap<String, AllocateRequest>();
private PriorityBlockingQueue<AllocateRequest> requestQueue =
- new PriorityBlockingQueue<AllocateRequest>();
+ new PriorityBlockingQueue<AllocateRequest>();
private volatile boolean hasException = false;
private DefaultMessageStore messageStore;
-
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
-
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
- && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
+ && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}
@@ -67,7 +62,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
@@ -83,7 +78,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
@@ -119,13 +114,11 @@ public class AllocateMappedFileService extends ServiceThread {
return null;
}
-
@Override
public String getServiceName() {
return AllocateMappedFileService.class.getSimpleName();
}
-
public void shutdown() {
this.stopped = true;
this.thread.interrupt();
@@ -144,7 +137,6 @@ public class AllocateMappedFileService extends ServiceThread {
}
}
-
public void run() {
log.info(this.getServiceName() + " service started");
@@ -154,7 +146,6 @@ public class AllocateMappedFileService extends ServiceThread {
log.info(this.getServiceName() + " service end");
}
-
/**
* Only interrupted by the external thread, will return false
*/
@@ -166,12 +157,12 @@ public class AllocateMappedFileService extends ServiceThread {
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
- + req.getFileSize());
+ + req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
- + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
+ + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
@@ -195,16 +186,16 @@ public class AllocateMappedFileService extends ServiceThread {
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
- + " " + req.getFilePath() + " " + req.getFileSize());
+ + " " + req.getFilePath() + " " + req.getFileSize());
}
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
- .getMapedFileSizeCommitLog()
- &&
- this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+ .getMapedFileSizeCommitLog()
+ &&
+ this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
- this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
+ this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
@@ -239,53 +230,43 @@ public class AllocateMappedFileService extends ServiceThread {
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile MappedFile mappedFile = null;
-
public AllocateRequest(String filePath, int fileSize) {
this.filePath = filePath;
this.fileSize = fileSize;
}
-
public String getFilePath() {
return filePath;
}
-
public void setFilePath(String filePath) {
this.filePath = filePath;
}
-
public int getFileSize() {
return fileSize;
}
-
public void setFileSize(int fileSize) {
this.fileSize = fileSize;
}
-
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
-
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
-
public MappedFile getMappedFile() {
return mappedFile;
}
-
public void setMappedFile(MappedFile mappedFile) {
this.mappedFile = mappedFile;
}
-
public int compareTo(AllocateRequest other) {
if (this.fileSize < other.fileSize)
return 1;
@@ -308,7 +289,6 @@ public class AllocateMappedFileService extends ServiceThread {
// other.fileSize ? -1 : 0;
}
-
@Override
public int hashCode() {
final int prime = 31;
@@ -318,7 +298,6 @@ public class AllocateMappedFileService extends ServiceThread {
return result;
}
-
@Override
public boolean equals(Object obj) {
if (this == obj)
@@ -327,7 +306,7 @@ public class AllocateMappedFileService extends ServiceThread {
return false;
if (getClass() != obj.getClass())
return false;
- AllocateRequest other = (AllocateRequest) obj;
+ AllocateRequest other = (AllocateRequest)obj;
if (filePath == null) {
if (other.filePath != null)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
index 0e8678c..6d158d3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
import java.nio.ByteBuffer;
-
/**
* Write messages callback interface
*
@@ -36,5 +35,5 @@ public interface AppendMessageCallback {
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
- final int maxBlank, final MessageExtBrokerInner msg);
+ final int maxBlank, final MessageExtBrokerInner msg);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
index 8541208..965097f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
@@ -40,7 +40,7 @@ public class AppendMessageResult {
}
public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,
- long storeTimestamp, long logicsOffset, long pagecacheRT) {
+ long storeTimestamp, long logicsOffset, long pagecacheRT) {
this.status = status;
this.wroteOffset = wroteOffset;
this.wroteBytes = wroteBytes;
@@ -62,62 +62,50 @@ public class AppendMessageResult {
return this.status == AppendMessageStatus.PUT_OK;
}
-
public AppendMessageStatus getStatus() {
return status;
}
-
public void setStatus(AppendMessageStatus status) {
this.status = status;
}
-
public long getWroteOffset() {
return wroteOffset;
}
-
public void setWroteOffset(long wroteOffset) {
this.wroteOffset = wroteOffset;
}
-
public int getWroteBytes() {
return wroteBytes;
}
-
public void setWroteBytes(int wroteBytes) {
this.wroteBytes = wroteBytes;
}
-
public String getMsgId() {
return msgId;
}
-
public void setMsgId(String msgId) {
this.msgId = msgId;
}
-
public long getStoreTimestamp() {
return storeTimestamp;
}
-
public void setStoreTimestamp(long storeTimestamp) {
this.storeTimestamp = storeTimestamp;
}
-
public long getLogicsOffset() {
return logicsOffset;
}
-
public void setLogicsOffset(long logicsOffset) {
this.logicsOffset = logicsOffset;
}
@@ -125,13 +113,13 @@ public class AppendMessageResult {
@Override
public String toString() {
return "AppendMessageResult{" +
- "status=" + status +
- ", wroteOffset=" + wroteOffset +
- ", wroteBytes=" + wroteBytes +
- ", msgId='" + msgId + '\'' +
- ", storeTimestamp=" + storeTimestamp +
- ", logicsOffset=" + logicsOffset +
- ", pagecacheRT=" + pagecacheRT +
- '}';
+ "status=" + status +
+ ", wroteOffset=" + wroteOffset +
+ ", wroteBytes=" + wroteBytes +
+ ", msgId='" + msgId + '\'' +
+ ", storeTimestamp=" + storeTimestamp +
+ ", logicsOffset=" + logicsOffset +
+ ", pagecacheRT=" + pagecacheRT +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java
index 34f70b1..39cf9fa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index ddd6be3..17625f4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -16,6 +16,15 @@
*/
package org.apache.rocketmq.store;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,20 +40,8 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-
/**
* Store all metadata downtime for recovery, data protection reliability
- *
*/
public class CommitLog {
// Message's MAGIC CODE daa320a7
@@ -72,7 +69,7 @@ public class CommitLog {
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
- defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
+ defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
@@ -126,17 +123,15 @@ public class CommitLog {
return this.mappedFileQueue.remainHowManyDataToFlush();
}
-
public int deleteExpiredFile(//
- final long expiredTime, //
- final int deleteFilesInterval, //
- final long intervalForcibly, //
- final boolean cleanImmediately//
+ final long expiredTime, //
+ final int deleteFilesInterval, //
+ final long intervalForcibly, //
+ final boolean cleanImmediately//
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
-
/**
* Read CommitLog data, use data replication
*/
@@ -144,12 +139,11 @@ public class CommitLog {
return this.getData(offset, offset == 0);
}
-
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
- int pos = (int) (offset % mappedFileSize);
+ int pos = (int)(offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
@@ -157,7 +151,6 @@ public class CommitLog {
return null;
}
-
/**
* When the normal exit, data recovery, all memory data have been flush
*/
@@ -227,8 +220,7 @@ public class CommitLog {
/**
* check the message and returns the message size
*
- * @return 0 Come the end of the file // >0 Normal messages // -1 Message
- * checksum failure
+ * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
try {
@@ -340,7 +332,7 @@ public class CommitLog {
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
- storeTimestamp);
+ storeTimestamp);
}
}
}
@@ -354,23 +346,23 @@ public class CommitLog {
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
- "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
- totalSize, readLength, bodyLen, topicLen, propertiesLength);
+ "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
+ totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}
return new DispatchRequest(//
- topic, // 1
- queueId, // 2
- physicOffset, // 3
- totalSize, // 4
- tagsCode, // 5
- storeTimestamp, // 6
- queueOffset, // 7
- keys, // 8
- uniqKey, //9
- sysFlag, // 9
- preparedTransactionOffset// 10
+ topic, // 1
+ queueId, // 2
+ physicOffset, // 3
+ totalSize, // 4
+ tagsCode, // 5
+ storeTimestamp, // 6
+ queueOffset, // 7
+ keys, // 8
+ uniqKey, //9
+ sysFlag, // 9
+ preparedTransactionOffset// 10
);
} catch (Exception e) {
}
@@ -380,24 +372,24 @@ public class CommitLog {
private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 // 1 TOTALSIZE
- + 4 // 2 MAGICCODE
- + 4 // 3 BODYCRC
- + 4 // 4 QUEUEID
- + 4 // 5 FLAG
- + 8 // 6 QUEUEOFFSET
- + 8 // 7 PHYSICALOFFSET
- + 4 // 8 SYSFLAG
- + 8 // 9 BORNTIMESTAMP
- + 8 // 10 BORNHOST
- + 8 // 11 STORETIMESTAMP
- + 8 // 12 STOREHOSTADDRESS
- + 4 // 13 RECONSUMETIMES
- + 8 // 14 Prepared Transaction Offset
- + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY
- + 1 + topicLength // 15 TOPIC
- + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16
- // propertiesLength
- + 0;
+ + 4 // 2 MAGICCODE
+ + 4 // 3 BODYCRC
+ + 4 // 4 QUEUEID
+ + 4 // 5 FLAG
+ + 8 // 6 QUEUEOFFSET
+ + 8 // 7 PHYSICALOFFSET
+ + 4 // 8 SYSFLAG
+ + 8 // 9 BORNTIMESTAMP
+ + 8 // 10 BORNHOST
+ + 8 // 11 STORETIMESTAMP
+ + 8 // 12 STOREHOSTADDRESS
+ + 4 // 13 RECONSUMETIMES
+ + 8 // 14 Prepared Transaction Offset
+ + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY
+ + 1 + topicLength // 15 TOPIC
+ + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16
+ // propertiesLength
+ + 0;
return msgLen;
}
@@ -441,7 +433,6 @@ public class CommitLog {
if (size > 0) {
mappedFileOffset += size;
-
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
@@ -505,18 +496,18 @@ public class CommitLog {
}
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
- && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
+ && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}", //
- storeTimestamp, //
- UtilAll.timeMillisToHumanString(storeTimestamp));
+ storeTimestamp, //
+ UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}", //
- storeTimestamp, //
- UtilAll.timeMillisToHumanString(storeTimestamp));
+ storeTimestamp, //
+ UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
@@ -552,7 +543,7 @@ public class CommitLog {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+ || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
@@ -636,7 +627,6 @@ public class CommitLog {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
-
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
@@ -647,14 +637,14 @@ public class CommitLog {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
+ final GroupCommitService service = (GroupCommitService)this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
- + " client address: " + msg.getBornHostString());
+ + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
@@ -684,11 +674,11 @@ public class CommitLog {
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
- // TODO
- request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ // TODO
+ request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
- + msg.getTags() + " client address: " + msg.getBornHostString());
+ + msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
@@ -739,7 +729,7 @@ public class CommitLog {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
- int pos = (int) (offset % mappedFileSize);
+ int pos = (int)(offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
@@ -754,17 +744,14 @@ public class CommitLog {
return topicQueueTable;
}
-
public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
this.topicQueueTable = topicQueueTable;
}
-
public void destroy() {
this.mappedFileQueue.destroy();
}
-
public boolean appendData(long startOffset, byte[] data) {
lockForPutMessage(); //spin...
try {
@@ -780,7 +767,6 @@ public class CommitLog {
}
}
-
public boolean retryDeleteFirstFile(final long intervalForcibly) {
return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
}
@@ -798,6 +784,72 @@ public class CommitLog {
mappedFileQueue.checkSelf();
}
+ public long lockTimeMills() {
+ long diff = 0;
+ long begin = this.beginTimeInLock;
+ if (begin > 0) {
+ diff = this.defaultMessageStore.now() - begin;
+ }
+
+ if (diff < 0) {
+ diff = 0;
+ }
+
+ return diff;
+ }
+
+ /**
+ * Spin util acquired the lock.
+ */
+ private void lockForPutMessage() {
+ if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
+ putMessageNormalLock.lock();
+ } else {
+ boolean flag;
+ do {
+ flag = this.putMessageSpinLock.compareAndSet(true, false);
+ }
+ while (!flag);
+ }
+ }
+
+ private void releasePutMessageLock() {
+ if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
+ putMessageNormalLock.unlock();
+ } else {
+ this.putMessageSpinLock.compareAndSet(false, true);
+ }
+ }
+
+ public static class GroupCommitRequest {
+ private final long nextOffset;
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+ private volatile boolean flushOK = false;
+
+ public GroupCommitRequest(long nextOffset) {
+ this.nextOffset = nextOffset;
+ }
+
+ public long getNextOffset() {
+ return nextOffset;
+ }
+
+ public void wakeupCustomer(final boolean flushOK) {
+ this.flushOK = flushOK;
+ this.countDownLatch.countDown();
+ }
+
+ public boolean waitForFlush(long timeout) {
+ try {
+ this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return this.flushOK;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+
abstract class FlushCommitLogService extends ServiceThread {
protected static final int RETRY_TIMES_OVER = 10;
}
@@ -820,7 +872,7 @@ public class CommitLog {
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
int commitDataThoroughInterval =
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
+ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
@@ -859,7 +911,6 @@ public class CommitLog {
private long lastFlushTimestamp = 0;
private long printTimes = 0;
-
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
@@ -870,7 +921,7 @@ public class CommitLog {
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
int flushPhysicQueueThoroughInterval =
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
+ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
@@ -921,58 +972,22 @@ public class CommitLog {
CommitLog.log.info(this.getServiceName() + " service end");
}
-
@Override
public String getServiceName() {
return FlushRealTimeService.class.getSimpleName();
}
-
private void printFlushProgress() {
// CommitLog.log.info("how much disk fall behind memory, "
// + CommitLog.this.mappedFileQueue.howMuchFallBehind());
}
-
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}
- public static class GroupCommitRequest {
- private final long nextOffset;
- private final CountDownLatch countDownLatch = new CountDownLatch(1);
- private volatile boolean flushOK = false;
-
-
- public GroupCommitRequest(long nextOffset) {
- this.nextOffset = nextOffset;
- }
-
-
- public long getNextOffset() {
- return nextOffset;
- }
-
-
- public void wakeupCustomer(final boolean flushOK) {
- this.flushOK = flushOK;
- this.countDownLatch.countDown();
- }
-
-
- public boolean waitForFlush(long timeout) {
- try {
- this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
- return this.flushOK;
- } catch (InterruptedException e) {
- e.printStackTrace();
- return false;
- }
- }
- }
-
/**
* GroupCommit Service
*/
@@ -980,7 +995,6 @@ public class CommitLog {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
-
public void putRequest(final GroupCommitRequest request) {
synchronized (this) {
this.requestsWrite.add(request);
@@ -990,14 +1004,12 @@ public class CommitLog {
}
}
-
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
-
private void doCommit() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
@@ -1028,7 +1040,6 @@ public class CommitLog {
}
}
-
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
@@ -1058,19 +1069,16 @@ public class CommitLog {
CommitLog.log.info(this.getServiceName() + " service end");
}
-
@Override
protected void onWaitEnd() {
this.swapRequests();
}
-
@Override
public String getServiceName() {
return GroupCommitService.class.getSimpleName();
}
-
@Override
public long getJointime() {
return 1000 * 60 * 5;
@@ -1090,19 +1098,16 @@ public class CommitLog {
private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
-
DefaultAppendMessageCallback(final int size) {
this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
this.maxMessageSize = size;
}
-
public ByteBuffer getMsgStoreItemMemory() {
return msgStoreItemMemory;
}
-
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
@@ -1143,9 +1148,9 @@ public class CommitLog {
* Serialize message
*/
final byte[] propertiesData =
- msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
- final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;
+ final short propertiesLength = propertiesData == null ? 0 : (short)propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
@@ -1162,7 +1167,7 @@ public class CommitLog {
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
- + ", maxMessageSize: " + this.maxMessageSize);
+ + ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
@@ -1180,7 +1185,7 @@ public class CommitLog {
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
- queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
@@ -1221,7 +1226,7 @@ public class CommitLog {
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
- this.msgStoreItemMemory.put((byte) topicLength);
+ this.msgStoreItemMemory.put((byte)topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort(propertiesLength);
@@ -1233,7 +1238,7 @@ public class CommitLog {
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
- msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
@@ -1250,46 +1255,9 @@ public class CommitLog {
return result;
}
-
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
}
}
-
- public long lockTimeMills() {
- long diff = 0;
- long begin = this.beginTimeInLock;
- if (begin > 0) {
- diff = this.defaultMessageStore.now() - begin;
- }
-
- if (diff < 0) {
- diff = 0;
- }
-
- return diff;
- }
-
- /**
- * Spin util acquired the lock.
- */
- private void lockForPutMessage() {
- if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
- putMessageNormalLock.lock();
- } else {
- boolean flag;
- do {
- flag = this.putMessageSpinLock.compareAndSet(true, false);
- } while (!flag);
- }
- }
-
- private void releasePutMessageLock() {
- if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
- putMessageNormalLock.unlock();
- } else {
- this.putMessageSpinLock.compareAndSet(false, true);
- }
- }
}