You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:30 UTC
[23/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
index c82cbdf..9cccaaf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
@@ -6,20 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Base class for background thread
*
@@ -32,20 +31,16 @@ public abstract class ServiceThread implements Runnable {
protected volatile boolean hasNotified = false;
protected volatile boolean stopped = false;
-
public ServiceThread() {
this.thread = new Thread(this, this.getServiceName());
}
-
public abstract String getServiceName();
-
public void start() {
this.thread.start();
}
-
public void shutdown() {
this.shutdown(false);
}
@@ -69,7 +64,7 @@ public abstract class ServiceThread implements Runnable {
this.thread.join(this.getJointime());
long eclipseTime = System.currentTimeMillis() - beginTime;
STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
- + this.getJointime());
+ + this.getJointime());
} catch (InterruptedException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java
index 72c5287..62a10a6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingCommandException.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.exception;
public class RemotingCommandException extends RemotingException {
private static final long serialVersionUID = -6061365915274953096L;
-
public RemotingCommandException(String message) {
super(message, null);
}
-
public RemotingCommandException(String message, Throwable cause) {
super(message, cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java
index 2fa4d69..c3e4777 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.exception;
public class RemotingConnectException extends RemotingException {
private static final long serialVersionUID = -5565366231695911316L;
-
public RemotingConnectException(String addr) {
this(addr, null);
}
-
public RemotingConnectException(String addr, Throwable cause) {
super("connect to <" + addr + "> failed", cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java
index f4a79ea..cbc363b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingException.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.exception;
public class RemotingException extends Exception {
private static final long serialVersionUID = -5690687334570505110L;
-
public RemotingException(String message) {
super(message);
}
-
public RemotingException(String message, Throwable cause) {
super(message, cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java
index 720ec1f..4eb1b63 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.exception;
public class RemotingSendRequestException extends RemotingException {
private static final long serialVersionUID = 5391285827332471674L;
-
public RemotingSendRequestException(String addr) {
this(addr, null);
}
-
public RemotingSendRequestException(String addr, Throwable cause) {
super("send request to <" + addr + "> failed", cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java
index 1190b49..e4cc69e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTimeoutException.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.exception;
@@ -20,17 +20,14 @@ public class RemotingTimeoutException extends RemotingException {
private static final long serialVersionUID = 4106899185095245979L;
-
public RemotingTimeoutException(String message) {
super(message);
}
-
public RemotingTimeoutException(String addr, long timeoutMillis) {
this(addr, timeoutMillis, null);
}
-
public RemotingTimeoutException(String addr, long timeoutMillis, Throwable cause) {
super("wait response on the channel <" + addr + "> timeout, " + timeoutMillis + "(ms)", cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
index 80d4418..8ec5cf6 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
@@ -6,20 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.exception;
public class RemotingTooMuchRequestException extends RemotingException {
private static final long serialVersionUID = 4326919581254519654L;
-
public RemotingTooMuchRequestException(String message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index b797272..7c017eb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
@@ -53,97 +53,78 @@ public class NettyClientConfig {
return clientWorkerThreads;
}
-
public void setClientWorkerThreads(int clientWorkerThreads) {
this.clientWorkerThreads = clientWorkerThreads;
}
-
public int getClientOnewaySemaphoreValue() {
return clientOnewaySemaphoreValue;
}
-
public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) {
this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue;
}
-
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
-
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
-
public int getClientCallbackExecutorThreads() {
return clientCallbackExecutorThreads;
}
-
public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
}
-
public long getChannelNotActiveInterval() {
return channelNotActiveInterval;
}
-
public void setChannelNotActiveInterval(long channelNotActiveInterval) {
this.channelNotActiveInterval = channelNotActiveInterval;
}
-
public int getClientAsyncSemaphoreValue() {
return clientAsyncSemaphoreValue;
}
-
public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) {
this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue;
}
-
public int getClientChannelMaxIdleTimeSeconds() {
return clientChannelMaxIdleTimeSeconds;
}
-
public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) {
this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds;
}
-
public int getClientSocketSndBufSize() {
return clientSocketSndBufSize;
}
-
public void setClientSocketSndBufSize(int clientSocketSndBufSize) {
this.clientSocketSndBufSize = clientSocketSndBufSize;
}
-
public int getClientSocketRcvBufSize() {
return clientSocketRcvBufSize;
}
-
public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) {
this.clientSocketRcvBufSize = clientSocketRcvBufSize;
}
-
public boolean isClientPooledByteBufAllocatorEnable() {
return clientPooledByteBufAllocatorEnable;
}
-
public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index 0a8ba97..73d7f2b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -6,47 +6,43 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-
-
/**
*
*/
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH = //
- Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
-
+ Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
-
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
- frame = (ByteBuf) super.decode(ctx, in);
+ frame = (ByteBuf)super.decode(ctx, in);
if (null == frame) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
index 35adcf2..fdebcdc 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
@@ -6,28 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-
-
/**
*
*/
@@ -36,7 +34,7 @@ public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
- throws Exception {
+ throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java
index e086409..825d1da 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEvent.java
@@ -6,47 +6,41 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.Channel;
-
public class NettyEvent {
private final NettyEventType type;
private final String remoteAddr;
private final Channel channel;
-
public NettyEvent(NettyEventType type, String remoteAddr, Channel channel) {
this.type = type;
this.remoteAddr = remoteAddr;
this.channel = channel;
}
-
public NettyEventType getType() {
return type;
}
-
public String getRemoteAddr() {
return remoteAddr;
}
-
public Channel getChannel() {
return channel;
}
-
@Override
public String toString() {
return "NettyEvent [type=" + type + ", remoteAddr=" + remoteAddr + ", channel=" + channel + "]";
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
index ae4b647..b2135da 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEventType.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 1034dd8..c0136d3 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -16,25 +16,10 @@
*/
package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.ChannelEventListener;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
-import org.apache.rocketmq.remoting.common.ServiceThread;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
@@ -47,28 +32,37 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import org.apache.rocketmq.remoting.common.ServiceThread;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class NettyRemotingAbstract {
private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-
protected final Semaphore semaphoreOneway;
-
protected final Semaphore semaphoreAsync;
-
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
- new ConcurrentHashMap<Integer, ResponseFuture>(256);
+ new ConcurrentHashMap<Integer, ResponseFuture>(256);
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
- new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+ new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
-
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
@@ -133,14 +127,14 @@ public abstract class NettyRemotingAbstract {
}
} catch (Throwable e) {
if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
- .equals(e.getClass().getCanonicalName())) {
+ .equals(e.getClass().getCanonicalName())) {
PLOG.error("process request exception", e);
PLOG.error(cmd.toString());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
- RemotingHelper.exceptionSimpleDesc(e));
+ RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
@@ -150,7 +144,7 @@ public abstract class NettyRemotingAbstract {
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
- "[REJECTREQUEST]system busy, start flow control for a while");
+ "[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
@@ -162,14 +156,14 @@ public abstract class NettyRemotingAbstract {
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
- + ", too many requests and system thread pool busy, RejectedExecutionException " //
- + pair.getObject2().toString() //
- + " request code: " + cmd.getCode());
+ + ", too many requests and system thread pool busy, RejectedExecutionException " //
+ + pair.getObject2().toString() //
+ + " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
- "[OVERLOAD]system busy, start flow control for a while");
+ "[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
@@ -177,7 +171,7 @@ public abstract class NettyRemotingAbstract {
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
- RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
+ RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
@@ -267,7 +261,7 @@ public abstract class NettyRemotingAbstract {
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+ throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
@@ -295,7 +289,7 @@ public abstract class NettyRemotingAbstract {
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
- responseFuture.getCause());
+ responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
@@ -308,8 +302,8 @@ public abstract class NettyRemotingAbstract {
}
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
- final InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ final InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
@@ -348,18 +342,18 @@ public abstract class NettyRemotingAbstract {
}
} else {
String info =
- String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
- timeoutMillis, //
- this.semaphoreAsync.getQueueLength(), //
- this.semaphoreAsync.availablePermits()//
- );
+ String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+ timeoutMillis, //
+ this.semaphoreAsync.getQueueLength(), //
+ this.semaphoreAsync.availablePermits()//
+ );
PLOG.warn(info);
throw new RemotingTooMuchRequestException(info);
}
}
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
@@ -384,10 +378,10 @@ public abstract class NettyRemotingAbstract {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
- "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
- timeoutMillis, //
- this.semaphoreOneway.getQueueLength(), //
- this.semaphoreOneway.availablePermits()//
+ "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+ timeoutMillis, //
+ this.semaphoreOneway.getQueueLength(), //
+ this.semaphoreOneway.availablePermits()//
);
PLOG.warn(info);
throw new RemotingTimeoutException(info);
@@ -399,7 +393,6 @@ public abstract class NettyRemotingAbstract {
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;
-
public void putNettyEvent(final NettyEvent event) {
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
@@ -408,7 +401,6 @@ public abstract class NettyRemotingAbstract {
}
}
-
@Override
public void run() {
PLOG.info(this.getServiceName() + " service started");
@@ -445,7 +437,6 @@ public abstract class NettyRemotingAbstract {
PLOG.info(this.getServiceName() + " service end");
}
-
@Override
public String getServiceName() {
return NettyEventExecuter.class.getSimpleName();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 3b7013a..db7815a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -16,18 +16,6 @@
*/
package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.ChannelEventListener;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
@@ -45,9 +33,6 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
@@ -64,7 +49,20 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -94,7 +92,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
- final ChannelEventListener channelEventListener) {
+ final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
@@ -107,7 +105,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
-
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
@@ -117,7 +114,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
-
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
@@ -134,36 +130,35 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
- nettyClientConfig.getClientWorkerThreads(), //
- new ThreadFactory() {
+ nettyClientConfig.getClientWorkerThreads(), //
+ new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
- }
- });
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
+ }
+ });
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE, false)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
- .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
- .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
- defaultEventExecutorGroup,
- new NettyEncoder(),
- new NettyDecoder(),
- new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
- new NettyConnetManageHandler(),
- new NettyClientHandler());
- }
- });
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+ .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+ .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ defaultEventExecutorGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+ new NettyConnetManageHandler(),
+ new NettyClientHandler());
+ }
+ });
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
@@ -233,7 +228,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
removeItemFromTable = false;
} else if (prevCW.getChannel() != channel) {
log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
- addrRemote);
+ addrRemote);
removeItemFromTable = false;
}
@@ -338,7 +333,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
- throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+ throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
@@ -431,7 +426,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return cw.getChannel();
}
-
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection = false;
@@ -476,7 +470,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
- channelFuture.toString());
+ channelFuture.toString());
}
}
@@ -485,8 +479,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
- throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
- RemotingSendRequestException {
+ throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
+ RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
@@ -507,7 +501,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
- RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
@@ -572,27 +566,22 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
static class ChannelWrapper {
private final ChannelFuture channelFuture;
-
public ChannelWrapper(ChannelFuture channelFuture) {
this.channelFuture = channelFuture;
}
-
public boolean isOK() {
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
-
public boolean isWriteable() {
return this.channelFuture.channel().isWritable();
}
-
private Channel getChannel() {
return this.channelFuture.channel();
}
-
public ChannelFuture getChannelFuture() {
return channelFuture;
}
@@ -610,7 +599,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
class NettyConnetManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)
- throws Exception {
+ throws Exception {
final String local = localAddress == null ? "UNKNOW" : localAddress.toString();
final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString();
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
@@ -621,7 +610,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
-
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
@@ -634,7 +622,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
-
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
@@ -650,14 +637,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
- IdleStateEvent evnet = (IdleStateEvent) evt;
+ IdleStateEvent evnet = (IdleStateEvent)evt;
if (evnet.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
- .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+ .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index c6e2eda..f109086 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -6,27 +6,16 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.ChannelEventListener;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.RemotingServer;
-import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
@@ -45,9 +34,6 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.InetSocketAddress;
import java.util.Timer;
import java.util.TimerTask;
@@ -55,7 +41,19 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -72,15 +70,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private RPCHook rpcHook;
-
private int port = 0;
-
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
-
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
@@ -95,7 +90,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
-
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
@@ -105,7 +99,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
-
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
@@ -113,12 +106,11 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
});
if (RemotingUtil.isLinuxPlatform() //
- && nettyServerConfig.isUseEpollNativeSelector()) {
+ && nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
-
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
@@ -129,7 +121,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
-
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
@@ -138,51 +129,49 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
-
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
- nettyServerConfig.getServerWorkerThreads(), //
- new ThreadFactory() {
+ nettyServerConfig.getServerWorkerThreads(), //
+ new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
+ }
+ });
+ ServerBootstrap childHandler =
+ this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
+ .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
+ .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
@Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ defaultEventExecutorGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new NettyConnetManageHandler(),
+ new NettyServerHandler());
}
});
- ServerBootstrap childHandler =
- this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.SO_KEEPALIVE, false)
- .childOption(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
- .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
- .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
- defaultEventExecutorGroup,
- new NettyEncoder(),
- new NettyDecoder(),
- new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- new NettyConnetManageHandler(),
- new NettyServerHandler());
- }
- });
-
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
- InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
+ InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
@@ -269,19 +258,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+ throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
@Override
public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
- RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}
@@ -316,7 +305,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
super.channelRegistered(ctx);
}
-
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
@@ -324,7 +312,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
super.channelUnregistered(ctx);
}
-
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
@@ -336,7 +323,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
-
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
@@ -348,18 +334,17 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
-
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
- IdleStateEvent evnet = (IdleStateEvent) evt;
+ IdleStateEvent evnet = (IdleStateEvent)evt;
if (evnet.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
- .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+ .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
}
}
}
@@ -367,7 +352,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
ctx.fireUserEventTriggered(evt);
}
-
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
index b8d2052..c6251e9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.ChannelHandlerContext;
-
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/**
* Common remoting command processor
@@ -27,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
*/
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws Exception;
+ throws Exception;
+
boolean rejectRequest();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index 0a53240..f69fded 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
@@ -43,118 +43,96 @@ public class NettyServerConfig implements Cloneable {
*/
private boolean useEpollNativeSelector = false;
-
public int getListenPort() {
return listenPort;
}
-
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
-
public int getServerWorkerThreads() {
return serverWorkerThreads;
}
-
public void setServerWorkerThreads(int serverWorkerThreads) {
this.serverWorkerThreads = serverWorkerThreads;
}
-
public int getServerSelectorThreads() {
return serverSelectorThreads;
}
-
public void setServerSelectorThreads(int serverSelectorThreads) {
this.serverSelectorThreads = serverSelectorThreads;
}
-
public int getServerOnewaySemaphoreValue() {
return serverOnewaySemaphoreValue;
}
-
public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {
this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;
}
-
public int getServerCallbackExecutorThreads() {
return serverCallbackExecutorThreads;
}
-
public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {
this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;
}
-
public int getServerAsyncSemaphoreValue() {
return serverAsyncSemaphoreValue;
}
-
public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {
this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;
}
-
public int getServerChannelMaxIdleTimeSeconds() {
return serverChannelMaxIdleTimeSeconds;
}
-
public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {
this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;
}
-
public int getServerSocketSndBufSize() {
return serverSocketSndBufSize;
}
-
public void setServerSocketSndBufSize(int serverSocketSndBufSize) {
this.serverSocketSndBufSize = serverSocketSndBufSize;
}
-
public int getServerSocketRcvBufSize() {
return serverSocketRcvBufSize;
}
-
public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {
this.serverSocketRcvBufSize = serverSocketRcvBufSize;
}
-
public boolean isServerPooledByteBufAllocatorEnable() {
return serverPooledByteBufAllocatorEnable;
}
-
public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {
this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;
}
-
public boolean isUseEpollNativeSelector() {
return useEpollNativeSelector;
}
-
public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {
this.useEpollNativeSelector = useEpollNativeSelector;
}
@Override
public Object clone() throws CloneNotSupportedException {
- return (NettyServerConfig) super.clone();
+ return (NettyServerConfig)super.clone();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index ca22df1..9409f92 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -19,24 +19,24 @@ package org.apache.rocketmq.remoting.netty;
public class NettySystemConfig {
public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
- "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
+ "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = //
- "com.rocketmq.remoting.socket.sndbuf.size";
+ "com.rocketmq.remoting.socket.sndbuf.size";
public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = //
- "com.rocketmq.remoting.socket.rcvbuf.size";
+ "com.rocketmq.remoting.socket.rcvbuf.size";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = //
- "com.rocketmq.remoting.clientAsyncSemaphoreValue";
+ "com.rocketmq.remoting.clientAsyncSemaphoreValue";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = //
- "com.rocketmq.remoting.clientOnewaySemaphoreValue";
+ "com.rocketmq.remoting.clientOnewaySemaphoreValue";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
- Boolean
- .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
- public static int socketSndbufSize = //
- Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
- public static int socketRcvbufSize = //
- Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
+ Boolean
+ .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
- Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = //
- Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
+ public static int socketSndbufSize = //
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
+ public static int socketRcvbufSize = //
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java
index e1317a0..0443b43 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java
@@ -17,9 +17,8 @@
package org.apache.rocketmq.remoting.netty;
-
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class RequestTask implements Runnable {
private final Runnable runnable;
@@ -37,7 +36,7 @@ public class RequestTask implements Runnable {
@Override
public int hashCode() {
int result = runnable != null ? runnable.hashCode() : 0;
- result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32));
+ result = 31 * result + (int)(getCreateTimestamp() ^ (getCreateTimestamp() >>> 32));
result = 31 * result + (channel != null ? channel.hashCode() : 0);
result = 31 * result + (request != null ? request.hashCode() : 0);
result = 31 * result + (isStopRun() ? 1 : 0);
@@ -46,14 +45,19 @@ public class RequestTask implements Runnable {
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (!(o instanceof RequestTask)) return false;
+ if (this == o)
+ return true;
+ if (!(o instanceof RequestTask))
+ return false;
- final RequestTask that = (RequestTask) o;
+ final RequestTask that = (RequestTask)o;
- if (getCreateTimestamp() != that.getCreateTimestamp()) return false;
- if (isStopRun() != that.isStopRun()) return false;
- if (channel != null ? !channel.equals(that.channel) : that.channel != null) return false;
+ if (getCreateTimestamp() != that.getCreateTimestamp())
+ return false;
+ if (isStopRun() != that.isStopRun())
+ return false;
+ if (channel != null ? !channel.equals(that.channel) : that.channel != null)
+ return false;
return request != null ? request.getOpaque() == that.request.getOpaque() : that.request == null;
}
@@ -72,7 +76,8 @@ public class RequestTask implements Runnable {
@Override
public void run() {
- if (!this.stopRun) this.runnable.run();
+ if (!this.stopRun)
+ this.runnable.run();
}
public void returnResponse(int code, String remark) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index d564a3a..fa792b2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -6,24 +6,22 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ResponseFuture {
private final int opaque;
@@ -39,16 +37,14 @@ public class ResponseFuture {
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
-
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
- SemaphoreReleaseOnlyOnce once) {
+ SemaphoreReleaseOnlyOnce once) {
this.opaque = opaque;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.once = once;
}
-
public void executeInvokeCallback() {
if (invokeCallback != null) {
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
@@ -57,87 +53,72 @@ public class ResponseFuture {
}
}
-
public void release() {
if (this.once != null) {
this.once.release();
}
}
-
public boolean isTimeout() {
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
-
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
-
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
-
public long getBeginTimestamp() {
return beginTimestamp;
}
-
public boolean isSendRequestOK() {
return sendRequestOK;
}
-
public void setSendRequestOK(boolean sendRequestOK) {
this.sendRequestOK = sendRequestOK;
}
-
public long getTimeoutMillis() {
return timeoutMillis;
}
-
public InvokeCallback getInvokeCallback() {
return invokeCallback;
}
-
public Throwable getCause() {
return cause;
}
-
public void setCause(Throwable cause) {
this.cause = cause;
}
-
public RemotingCommand getResponseCommand() {
return responseCommand;
}
-
public void setResponseCommand(RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
}
-
public int getOpaque() {
return opaque;
}
-
@Override
public String toString() {
return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
- + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
- + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
- + ", countDownLatch=" + countDownLatch + "]";
+ + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
+ + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
+ + ", countDownLatch=" + countDownLatch + "]";
}
}