You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/19 07:23:50 UTC
[6/8] incubator-rocketmq git commit: initialize RocketMQ5
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
new file mode 100644
index 0000000..bbd33ea
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.channel;
+
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+
+public class ChannelHandlerContextWrapperImpl<ChannelHandlerContext> implements ChannelHandlerContextWrapper {
+
+ private io.netty.channel.ChannelHandlerContext context;
+
+ public ChannelHandlerContextWrapperImpl(io.netty.channel.ChannelHandlerContext context) {
+ this.context = context;
+ }
+
+ public io.netty.channel.ChannelHandlerContext getContext() {
+ return context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
new file mode 100644
index 0000000..b90afc1
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.channel;
+
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+import org.apache.rocketmq.remoting.api.channel.ChunkRegion;
+
+public class FileRegionImpl extends AbstractReferenceCounted implements FileRegion {
+ private final ChunkRegion chunkRegion;
+
+ public FileRegionImpl(ChunkRegion chunkRegion) {
+ this.chunkRegion = chunkRegion;
+ }
+
+ @Override
+ public long position() {
+ return chunkRegion.position();
+ }
+
+ @Override
+ public long transfered() {
+ return chunkRegion.transferred();
+ }
+
+ @Override
+ public long transferred() {
+ return chunkRegion.transferred();
+ }
+
+ @Override
+ public long count() {
+ return chunkRegion.count();
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position) throws IOException {
+ return chunkRegion.transferTo(target, position);
+ }
+
+ @Override
+ protected void deallocate() {
+ chunkRegion.release();
+ }
+
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
new file mode 100644
index 0000000..ba4a969
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.channel;
+
+import io.netty.channel.Channel;
+import java.net.SocketAddress;
+import org.apache.rocketmq.remoting.api.channel.ChunkRegion;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+
+public class NettyChannelImpl implements RemotingChannel {
+ private final io.netty.channel.Channel channel;
+
+ public NettyChannelImpl(Channel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return channel.localAddress();
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return channel.remoteAddress();
+ }
+
+ @Override
+ public boolean isWritable() {
+ return channel.isWritable();
+ }
+
+ @Override
+ public boolean isActive() {
+ return channel.isActive();
+ }
+
+ @Override
+ public void close() {
+ channel.close();
+ }
+
+ @Override
+ public void reply(final RemotingCommand command) {
+ channel.writeAndFlush(command);
+ }
+
+ @Override
+ public void reply(final ChunkRegion fileRegion) {
+ channel.writeAndFlush(fileRegion);
+ }
+
+ public io.netty.channel.Channel getChannel() {
+ return channel;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ final NettyChannelImpl that = (NettyChannelImpl) o;
+
+ return channel != null ? channel.equals(that.channel) : that.channel == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return channel != null ? channel.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "NettyChannelImpl [channel=" + channel + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
new file mode 100644
index 0000000..44d4fd9
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Map.Entry;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
+
+public class CodecHelper {
+ //ProtocolType + TotalLength + RequestId + SerializeType + TrafficType + CodeLength + RemarkLength + PropertiesSize + ParameterLength
+ public final static int MIN_PROTOCOL_LEN = 1 + 4 + 4 + 1 + 1 + 2 + 2 + 2 + 4;
+ public final static char PROPERTY_SEPARATOR = '\n';
+ public final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
+
+ public final static int CODE_MAX_LEN = 512;
+ public final static int PARAMETER_MAX_LEN = 33554432;
+ public final static int BODY_MAX_LEN = 33554432;
+ public final static int PACKET_MAX_LEN = 33554432;
+
+ public static ByteBuffer encodeHeader(final RemotingCommand command, final int parameterLength,
+ final int extraPayload) {
+ byte[] code = command.opCode().getBytes(REMOTING_CHARSET);
+ int codeLength = code.length;
+
+ byte[] remark = command.remark().getBytes(REMOTING_CHARSET);
+ int remarkLength = remark.length;
+
+ byte[][] props = null;
+ int propsLength = 0;
+ StringBuilder sb = new StringBuilder();
+ if (!command.properties().isEmpty()) {
+ props = new byte[command.properties().size()][];
+ int i = 0;
+ for (Entry<String, String> next : command.properties().entrySet()) {
+ sb.setLength(0);
+ sb.append(next.getKey());
+ sb.append(PROPERTY_SEPARATOR);
+ sb.append(next.getValue());
+
+ props[i] = sb.toString().getBytes(REMOTING_CHARSET);
+
+ propsLength += 2;
+ propsLength += props[i].length;
+ i++;
+ }
+ }
+
+ int totalLength = MIN_PROTOCOL_LEN - 1 - 4
+ + codeLength
+ + remarkLength
+ + propsLength
+ + parameterLength
+ + extraPayload;
+
+ int headerLength = 1 + 4 + totalLength - parameterLength - extraPayload;
+
+ ByteBuffer buf = ByteBuffer.allocate(headerLength);
+ buf.put(command.protocolType());
+ buf.putInt(totalLength);
+ buf.putInt(command.requestID());
+ buf.put(command.serializerType());
+ buf.put((byte) command.trafficType().ordinal());
+
+ buf.putShort((short) codeLength);
+ if (codeLength > 0) {
+ buf.put(code);
+ }
+ buf.putShort((short) remarkLength);
+ if (remarkLength > 0) {
+ buf.put(remark);
+ }
+ if (props != null) {
+ buf.putShort((short) props.length);
+ for (byte[] prop : props) {
+ buf.putShort((short) prop.length);
+ buf.put(prop);
+ }
+ } else {
+ buf.putShort((short) 0);
+ }
+
+ buf.putInt(parameterLength);
+
+ buf.flip();
+
+ return buf;
+ }
+
+ public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+ RemotingCommandImpl cmd = new RemotingCommandImpl();
+ int totalLength = byteBuffer.limit();
+ cmd.requestID(byteBuffer.getInt());
+ cmd.serializerType(byteBuffer.get());
+ cmd.trafficType(TrafficType.parse(byteBuffer.get()));
+
+ {
+ short size = byteBuffer.getShort();
+ if (size > 0 && size <= CODE_MAX_LEN) {
+ byte[] bytes = new byte[size];
+ byteBuffer.get(bytes);
+ String str = new String(bytes, REMOTING_CHARSET);
+ cmd.opCode(str);
+ } else {
+ throw new RemoteCodecException(String.format("Code length: %d over max limit: %d", size, CODE_MAX_LEN));
+ }
+ }
+
+ {
+ short size = byteBuffer.getShort();
+ if (size > 0) {
+ byte[] bytes = new byte[size];
+ byteBuffer.get(bytes);
+ String str = new String(bytes, REMOTING_CHARSET);
+ cmd.remark(str);
+ }
+ }
+
+ {
+ short size = byteBuffer.getShort();
+ if (size > 0) {
+ for (int i = 0; i < size; i++) {
+ short length = byteBuffer.getShort();
+ if (length > 0) {
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes);
+ String str = new String(bytes, REMOTING_CHARSET);
+ int index = str.indexOf(PROPERTY_SEPARATOR);
+ if (index > 0) {
+ String key = str.substring(0, index);
+ String value = str.substring(index + 1);
+ cmd.property(key, value);
+ }
+ }
+ }
+ }
+ }
+
+ {
+ int size = byteBuffer.getInt();
+ if (size > 0 && size <= PARAMETER_MAX_LEN) {
+ byte[] bytes = new byte[size];
+ byteBuffer.get(bytes);
+ cmd.parameterBytes(bytes);
+ } else if (size != 0) {
+ throw new RemoteCodecException(String.format("Parameter size: %d over max limit: %d", size, PARAMETER_MAX_LEN));
+ }
+ }
+
+ {
+ int size = totalLength - byteBuffer.position();
+ if (size > 0 && size <= BODY_MAX_LEN) {
+ byte[] bytes = new byte[size];
+ byteBuffer.get(bytes);
+ cmd.extraPayload(bytes);
+ } else if (size != 0) {
+ throw new RemoteCodecException(String.format("Body size: %d over max limit: %d", size, BODY_MAX_LEN));
+ }
+ }
+
+ return cmd;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
new file mode 100644
index 0000000..f5d2126
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+
+public class RemotingCommandFactoryImpl implements RemotingCommandFactory {
+ private RemotingCommandFactoryMeta remotingCommandFactoryMeta;
+
+ public RemotingCommandFactoryImpl() {
+ this(new RemotingCommandFactoryMeta());
+ }
+
+ public RemotingCommandFactoryImpl(final RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
+ this.remotingCommandFactoryMeta = remotingCommandFactoryMeta;
+ }
+
+ @Override
+ public RemotingCommand createRequest() {
+ RemotingCommand request = new RemotingCommandImpl();
+ request.protocolType(this.remotingCommandFactoryMeta.getProtocolType());
+ request.serializerType(this.remotingCommandFactoryMeta.getSerializeType());
+ return request;
+ }
+
+ @Override
+ public RemotingCommand createResponse(final RemotingCommand command) {
+ RemotingCommand response = new RemotingCommandImpl();
+ response.requestID(command.requestID());
+ response.protocolType(command.protocolType());
+ response.serializerType(command.serializerType());
+ response.trafficType(TrafficType.RESPONSE);
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
new file mode 100644
index 0000000..bcf2338
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+
+public class RemotingCommandImpl implements RemotingCommand {
+ public final static RequestIdGenerator REQUEST_ID_GENERATOR = RequestIdGenerator.inst;
+
+ private byte protocolType;
+ private byte serializeType;
+
+ private volatile int requestId = REQUEST_ID_GENERATOR.incrementAndGet();
+ private TrafficType trafficType = TrafficType.REQUEST_SYNC;
+ private String code = CommandFlag.SUCCESS.flag();
+ private String remark = "";
+ private Map<String, String> properties = new HashMap<String, String>();
+ private Object parameter;
+ private byte[] extraPayload;
+
+ private byte[] parameterByte;
+
+ protected RemotingCommandImpl() {
+ }
+
+ @Override
+ public byte protocolType() {
+ return this.protocolType;
+ }
+
+ @Override
+ public void protocolType(byte value) {
+ this.protocolType = value;
+ }
+
+ @Override
+ public int requestID() {
+ return requestId;
+ }
+
+ @Override
+ public void requestID(int value) {
+ this.requestId = value;
+ }
+
+ @Override
+ public byte serializerType() {
+ return this.serializeType;
+ }
+
+ @Override
+ public void serializerType(byte value) {
+ this.serializeType = value;
+ }
+
+ @Override
+ public TrafficType trafficType() {
+ return this.trafficType;
+ }
+
+ @Override
+ public void trafficType(TrafficType value) {
+ this.trafficType = value;
+ }
+
+ @Override
+ public String opCode() {
+ return this.code;
+ }
+
+ @Override
+ public void opCode(String value) {
+ this.code = value;
+ }
+
+ @Override
+ public String remark() {
+ return this.remark;
+ }
+
+ @Override
+ public void remark(String value) {
+ this.remark = value;
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return this.properties;
+ }
+
+ @Override
+ public void properties(Map<String, String> value) {
+ this.properties = value;
+ }
+
+ @Override
+ public String property(String key) {
+ return this.properties.get(key);
+ }
+
+ @Override
+ public void property(String key, String value) {
+ this.properties.put(key, value);
+ }
+
+ @Override
+ public Object parameter() {
+ return this.parameter;
+ }
+
+ @Override
+ public void parameter(Object value) {
+ this.parameter = value;
+ }
+
+ @Override
+ public byte[] parameterBytes() {
+ return this.getParameterByte();
+ }
+
+ public byte[] getParameterByte() {
+ return parameterByte;
+ }
+
+ public void setParameterByte(byte[] parameterByte) {
+ this.parameterByte = parameterByte;
+ }
+
+ @Override
+ public void parameterBytes(byte[] value) {
+ this.setParameterByte(value);
+ }
+
+ @Override
+ public byte[] extraPayload() {
+ return this.extraPayload;
+ }
+
+ @Override
+ public void extraPayload(byte[] value) {
+ this.extraPayload = value;
+ }
+
+ @Override
+ public <T> T parameter(SerializerFactory serializerFactory, Class<T> c) {
+ if (this.parameter() != null)
+ return (T) this.parameter();
+ final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), c);
+ this.parameter(decode);
+ return decode;
+ }
+
+ @Override
+ public <T> T parameter(SerializerFactory serializerFactory, TypePresentation<T> typePresentation) {
+ if (this.parameter() != null)
+ return (T) this.parameter();
+ final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), typePresentation);
+ this.parameter(decode);
+ return decode;
+ }
+
+ @Override
+ public <T> T parameter(SerializerFactory serializerFactory, Type type) {
+ if (this.parameter() != null)
+ return (T) this.parameter();
+ final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), type);
+ this.parameter(decode);
+ return decode;
+ }
+
+ @Override
+ public int hashCode() {
+ return HashCodeBuilder.reflectionHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return EqualsBuilder.reflectionEquals(this, o);
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java
new file mode 100644
index 0000000..9b85c95
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RequestIdGenerator {
+ public static RequestIdGenerator inst = new RequestIdGenerator();
+
+ private AtomicInteger generator = new AtomicInteger(0);
+
+ private RequestIdGenerator() {
+
+ }
+
+ public int incrementAndGet() {
+ return generator.incrementAndGet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
new file mode 100644
index 0000000..ec9cece
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.channel.Channel;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class NettyChannelEvent {
+ private final Channel channel;
+ private final NettyChannelEventType type;
+ private final Throwable cause;
+
+ public NettyChannelEvent(NettyChannelEventType type, Channel channel) {
+ this(type, channel, null);
+ }
+
+ public NettyChannelEvent(NettyChannelEventType type, Channel channel, Throwable cause) {
+ this.type = type;
+ this.channel = channel;
+ this.cause = cause;
+ }
+
+ public NettyChannelEventType getType() {
+ return type;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
new file mode 100644
index 0000000..1bf2277
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+public enum NettyChannelEventType {
+ ACTIVE,
+ INACTIVE,
+ IDLE,
+ EXCEPTION
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
new file mode 100644
index 0000000..1af62cb
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingEndPoint;
+import org.apache.rocketmq.remoting.api.RemotingService;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
+import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
+import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+import org.apache.rocketmq.remoting.common.ResponseResult;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
+import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
+import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
+import org.apache.rocketmq.remoting.internal.UIDGenerator;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class NettyRemotingAbstract implements RemotingService {
+ protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class);
+ protected final ProtocolFactory protocolFactory = new ProtocolFactoryImpl();
+ protected final SerializerFactory serializerFactory = new SerializerFactoryImpl();
+ protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
+ private final Semaphore semaphoreOneway;
+ private final Semaphore semaphoreAsync;
+ private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256);
+ private final Map<String, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, ExecutorService>>();
+ private final AtomicLong count = new AtomicLong(0);
+ private final RemotingCommandFactory remotingCommandFactory;
+ private final String remotingInstanceId = UIDGenerator.instance().createUID();
+
+ private final ExecutorService publicExecutor;
+ protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
+ private InterceptorGroup interceptorGroup = new InterceptorGroup();
+ private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
+
+ NettyRemotingAbstract(RemotingConfig clientConfig) {
+ this(clientConfig, new RemotingCommandFactoryMeta());
+ }
+
+ NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
+ this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
+ this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
+ this.publicExecutor = ThreadUtils.newThreadPoolExecutor(clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(), 60,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000), "PublicExecutor", true);
+ this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
+ }
+
+ public SerializerFactory getSerializerFactory() {
+ return serializerFactory;
+ }
+
+ protected void putNettyEvent(final NettyChannelEvent event) {
+ this.channelEventExecutor.putNettyEvent(event);
+ }
+
+ protected void startUpHouseKeepingService() {
+ this.houseKeepingService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ scanResponseTable();
+ }
+ }, 3000, 1000, TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public void start() {
+ if (this.channelEventListenerGroup.size() > 0) {
+ this.channelEventExecutor.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
+ ThreadUtils.shutdownGracefully(channelEventExecutor);
+ }
+
+ protected void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand command) throws Exception {
+ if (command != null) {
+ switch (command.trafficType()) {
+ case REQUEST_ONEWAY:
+ case REQUEST_ASYNC:
+ case REQUEST_SYNC:
+ processRequestCommand(ctx, command);
+ break;
+ case RESPONSE:
+ processResponseCommand(ctx, command);
+ break;
+ default:
+ LOG.warn("Not supported The traffic type {} !", command.trafficType());
+ break;
+ }
+ }
+ }
+
+ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
+ Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.opCode());
+
+ RemotingChannel channel = new NettyChannelImpl(ctx.channel());
+
+ Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel);
+
+ try {
+ processorExecutorPair.getRight().submit(run);
+ } catch (RejectedExecutionException e) {
+ if ((System.currentTimeMillis() % 10000) == 0) {
+ LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd,
+ extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
+ }
+
+ if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+ interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE,
+ extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL"));
+
+ RemotingCommand response = remotingCommandFactory.createResponse(cmd);
+ response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
+ response.remark("SYSTEM_BUSY");
+ writeAndFlush(ctx.channel(), response);
+ }
+ }
+ }
+
+ @NotNull
+ private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd,
+ final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE,
+ extractRemoteAddress(ctx.channel()), cmd));
+
+ RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd);
+
+ interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE,
+ extractRemoteAddress(ctx.channel()), cmd, response));
+
+ handleResponse(response, cmd, ctx);
+ } catch (Throwable e) {
+ LOG.error(String.format("Process request %s error !", cmd.toString()), e);
+
+ handleException(e, cmd, ctx);
+ }
+ }
+ };
+ }
+
+ private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) {
+ if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+ //FiXME Exception interceptor can not throw exception
+ interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
+ RemotingCommand response = remotingCommandFactory.createResponse(cmd);
+ response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
+ response.remark(serializeException(cmd.serializerType(), e));
+ response.property("Exception", e.getClass().getName());
+ ctx.writeAndFlush(response);
+ }
+ }
+
+ private String serializeException(byte serializeType, Throwable exception) {
+ final Serializer serialization = getSerializerFactory().get(serializeType);
+ return serialization.encode(exception).toString();
+ }
+
+ private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
+ if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+ if (response != null) {
+ try {
+ writeAndFlush(ctx.channel(), response);
+ } catch (Throwable e) {
+ LOG.error(String.format("Process request %s success, but transfer response %s failed !",
+ cmd.toString(), response.toString()), e);
+ }
+ }
+ }
+
+ }
+
+ private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
+ final ResponseResult responseResult = ackTables.get(cmd.requestID());
+ if (responseResult != null) {
+ responseResult.setResponseCommand(cmd);
+ responseResult.release();
+
+ long time = System.currentTimeMillis();
+ ackTables.remove(cmd.requestID());
+ if (count.incrementAndGet() % 5000 == 0)
+ LOG.warn("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(),
+ ackTables.size());
+ if (responseResult.getAsyncHandler() != null) {
+ boolean sameThread = false;
+ ExecutorService executor = this.getCallbackExecutor();
+ if (executor != null) {
+ try {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ responseResult.executeCallbackArrived(responseResult.getResponseCommand());
+ } catch (Throwable e) {
+ LOG.warn("Execute callback error !", e);
+ }
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ sameThread = true;
+ LOG.warn("Execute submit error !", e);
+ }
+ } else {
+ sameThread = true;
+ }
+
+ if (sameThread) {
+ try {
+ responseResult.executeCallbackArrived(responseResult.getResponseCommand());
+ } catch (Throwable e) {
+ LOG.warn("Execute callback in response thread error !", e);
+ }
+ }
+ } else {
+ responseResult.putResponse(cmd);
+ }
+ } else {
+ LOG.warn("request {} from {} has not matched response !", cmd, extractRemoteAddress(ctx.channel()));
+ }
+ }
+
+ private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) {
+ channel.writeAndFlush(msg).addListener(listener);
+ }
+
+ private void writeAndFlush(final Channel channel, final Object msg) {
+ channel.writeAndFlush(msg);
+ }
+
+ public ExecutorService getCallbackExecutor() {
+ return this.publicExecutor;
+ }
+
+ void scanResponseTable() {
+ /*
+ Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, ResponseResult> next = iterator.next();
+ ResponseResult result = next.getValue();
+
+ if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) {
+ iterator.remove();
+ try {
+ long timeoutMillis = result.getTimeoutMillis();
+ long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp();
+ result.onTimeout(timeoutMillis, costTimeMillis);
+ LOG.error("scan response table command {} failed", result.getRequestId());
+ } catch (Throwable e) {
+ LOG.warn("Error occurred when execute timeout callback !", e);
+ } finally {
+ result.release();
+ LOG.warn("Removed timeout request {} ", result);
+ }
+ }
+ }
+ */
+ }
+
+ public RemotingCommand invokeWithInterceptor(final Channel channel, final RemotingCommand request,
+ long timeoutMillis) {
+ request.trafficType(TrafficType.REQUEST_SYNC);
+
+ final String remoteAddr = extractRemoteAddress(channel);
+
+ //FIXME try catch here
+ this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
+
+ RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis);
+
+ this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
+ extractRemoteAddress(channel), request, responseCommand));
+
+ return responseCommand;
+ }
+
+ private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request,
+ final long timeoutMillis) {
+ try {
+ final int opaque = request.requestID();
+ final ResponseResult responseResult = new ResponseResult(opaque, timeoutMillis);
+ responseResult.setRequestCommand(request);
+ //FIXME one interceptor for all case ?
+ responseResult.setInterceptorGroup(this.interceptorGroup);
+ responseResult.setRemoteAddr(remoteAddr);
+
+ this.ackTables.put(opaque, responseResult);
+
+ ChannelFutureListener listener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ if (f.isSuccess()) {
+ responseResult.setSendRequestOK(true);
+ return;
+ } else {
+ responseResult.setSendRequestOK(false);
+
+ ackTables.remove(opaque);
+ responseResult.setCause(f.cause());
+ responseResult.putResponse(null);
+
+ LOG.warn("Send request command to {} failed !", remoteAddr);
+ }
+ }
+ };
+
+ this.writeAndFlush(channel, request, listener);
+
+ RemotingCommand responseCommand = responseResult.waitResponse(timeoutMillis);
+
+ if (null == responseCommand) {
+ if (responseResult.isSendRequestOK()) {
+ throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause());
+ }
+ /*
+ else {
+ throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause());
+ }*/
+ }
+
+ return responseCommand;
+ } finally {
+ this.ackTables.remove(request.requestID());
+ }
+ }
+
+ public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request,
+ final AsyncHandler invokeCallback, long timeoutMillis) {
+ request.trafficType(TrafficType.REQUEST_ASYNC);
+
+ final String remoteAddr = extractRemoteAddress(channel);
+
+ this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
+
+ Exception exception = null;
+
+ try {
+ this.invokeAsync0(remoteAddr, channel, request, timeoutMillis, invokeCallback);
+ } catch (InterruptedException e) {
+ exception = e;
+ } finally {
+ if (null != exception) {
+ try {
+ this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
+ } catch (Throwable e) {
+ LOG.warn("onException ", e);
+ }
+ }
+ }
+ }
+
+ private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request,
+ final long timeoutMillis, final AsyncHandler invokeCallback) throws InterruptedException {
+ boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (acquired) {
+ final int requestID = request.requestID();
+
+ SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
+
+ final ResponseResult responseResult = new ResponseResult(request.requestID(), timeoutMillis, invokeCallback, once);
+ responseResult.setRequestCommand(request);
+ responseResult.setInterceptorGroup(this.interceptorGroup);
+ responseResult.setRemoteAddr(remoteAddr);
+
+ this.ackTables.put(request.requestID(), responseResult);
+ try {
+ ChannelFutureListener listener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ responseResult.setSendRequestOK(f.isSuccess());
+ if (f.isSuccess()) {
+ return;
+ }
+
+ responseResult.putResponse(null);
+ ackTables.remove(requestID);
+ try {
+ responseResult.executeRequestSendFailed();
+ } catch (Throwable e) {
+ LOG.warn("Execute callback error !", e);
+ } finally {
+ responseResult.release();
+ }
+
+ LOG.warn("Send request command to channel failed.", remoteAddr);
+ }
+ };
+
+ this.writeAndFlush(channel, request, listener);
+ } catch (Exception e) {
+ responseResult.release();
+ LOG.error("Send request command to channel " + channel + " error !", e);
+ }
+ } else {
+ String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
+ timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+ LOG.error(info);
+ throw new RemoteTimeoutException(info);
+ }
+ }
+
+ public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request, long timeoutMillis) {
+ request.trafficType(TrafficType.REQUEST_ONEWAY);
+
+ this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request));
+
+ Exception exception = null;
+
+ try {
+ this.invokeOneway0(channel, request, timeoutMillis);
+ } catch (InterruptedException e) {
+ exception = e;
+ } finally {
+ if (null != exception) {
+ try {
+ this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
+ } catch (Throwable e) {
+ LOG.warn("onException ", e);
+ }
+ }
+ }
+ }
+
+ private void invokeOneway0(final Channel channel, final RemotingCommand request,
+ final long timeoutMillis) throws InterruptedException {
+ boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (acquired) {
+ final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
+ try {
+ final SocketAddress socketAddress = channel.remoteAddress();
+
+ ChannelFutureListener listener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ once.release();
+ if (!f.isSuccess()) {
+ LOG.warn("Send request command to channel {} failed !", socketAddress);
+ }
+ }
+ };
+
+ this.writeAndFlush(channel, request, listener);
+ } catch (Exception e) {
+ once.release();
+ LOG.error("Send request command to channel " + channel + " error !", e);
+ }
+ } else {
+ String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
+ timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+ LOG.error(info);
+ throw new RemoteTimeoutException(info);
+ }
+ }
+
+ public String getRemotingInstanceId() {
+ return remotingInstanceId;
+ }
+
+ @Override
+ public ProtocolFactory protocolFactory() {
+ return this.protocolFactory;
+ }
+
+ @Override
+ public SerializerFactory serializerFactory() {
+ return this.serializerFactory;
+ }
+
+ @Override
+ public RemotingCommandFactory commandFactory() {
+ return this.remotingCommandFactory;
+ }
+
+ @Override
+ public void registerRequestProcessor(String requestCode, RequestProcessor processor, ExecutorService executor) {
+ Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executor);
+ if (!this.processorTables.containsKey(requestCode)) {
+ this.processorTables.put(requestCode, pair);
+ }
+ }
+
+ @Override
+ public void registerRequestProcessor(String requestCode, RequestProcessor processor) {
+ this.registerRequestProcessor(requestCode, processor, publicExecutor);
+ }
+
+ @Override
+ public void unregisterRequestProcessor(String requestCode) {
+ this.processorTables.remove(requestCode);
+ }
+
+ @Override
+ public String remotingInstanceId() {
+ return this.getRemotingInstanceId();
+ }
+
+ @Override
+ public void registerInterceptor(Interceptor interceptor) {
+ this.interceptorGroup.registerInterceptor(interceptor);
+ }
+
+ @Override
+ public void registerChannelEventListener(ChannelEventListener listener) {
+ this.channelEventListenerGroup.registerChannelEventListener(listener);
+ }
+
+ @Override
+ public Pair<RequestProcessor, ExecutorService> processor(String requestCode) {
+ return processorTables.get(requestCode);
+ }
+
+ protected String extractRemoteAddress(Channel channel) {
+ return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
+ }
+
+ class ChannelEventExecutor extends Thread {
+ private final static int MAX_SIZE = 10000;
+ private final LinkedBlockingQueue<NettyChannelEvent> eventQueue = new LinkedBlockingQueue<NettyChannelEvent>();
+ private String name;
+
+ public ChannelEventExecutor(String nettyEventExector) {
+ super(nettyEventExector);
+ this.name = nettyEventExector;
+ }
+ //private final AtomicBoolean isStopped = new AtomicBoolean(true);
+
+ public void putNettyEvent(final NettyChannelEvent event) {
+ if (this.eventQueue.size() <= MAX_SIZE) {
+ this.eventQueue.add(event);
+ } else {
+ LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+ }
+ }
+
+ @Override
+ public void run() {
+ LOG.info(this.name + " service started");
+
+ ChannelEventListenerGroup listener = NettyRemotingAbstract.this.channelEventListenerGroup;
+
+ while (true) {
+ try {
+ NettyChannelEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
+ if (event != null && listener != null) {
+ RemotingChannel channel = new NettyChannelImpl(event.getChannel());
+
+ LOG.warn("Channel Event, {}", event);
+
+ switch (event.getType()) {
+ case IDLE:
+ listener.onChannelIdle(channel);
+ break;
+ case INACTIVE:
+ listener.onChannelClose(channel);
+ break;
+ case ACTIVE:
+ listener.onChannelConnect(channel);
+ break;
+ case EXCEPTION:
+ listener.onChannelException(channel);
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("error", e);
+ break;
+ }
+ }
+ }
+
+ }
+
+ protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ processMessageReceived(ctx, msg);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
new file mode 100644
index 0000000..7481574
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLException;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
+import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
+import org.apache.rocketmq.remoting.internal.JvmUtils;
+
+public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private final Bootstrap clientBootstrap = new Bootstrap();
+ private final EventLoopGroup ioGroup;
+ private final Class<? extends SocketChannel> socketChannelClass;
+
+ private final RemotingConfig clientConfig;
+
+ private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
+ private final Lock lockChannelTables = new ReentrantLock();
+ private EventExecutorGroup workerGroup;
+ private SslContext sslContext;
+
+ NettyRemotingClient(final RemotingConfig clientConfig) {
+ super(clientConfig, new RemotingCommandFactoryMeta(clientConfig.getProtocolName(), clientConfig.getSerializerName()));
+ this.clientConfig = clientConfig;
+
+ if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
+ this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+ clientConfig.getClientWorkerThreads()));
+ socketChannelClass = EpollSocketChannel.class;
+ } else {
+ this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
+ clientConfig.getClientWorkerThreads()));
+ socketChannelClass = NioSocketChannel.class;
+ }
+
+ this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
+ ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
+
+ if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
+ buildSslContext();
+ }
+ }
+
+ private void applyOptions(Bootstrap bootstrap) {
+ if (null != clientConfig) {
+ if (clientConfig.getTcpSoLinger() > 0) {
+ bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger());
+ }
+
+ if (clientConfig.getTcpSoSndBufSize() > 0) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize());
+ }
+ if (clientConfig.getTcpSoRcvBufSize() > 0) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize());
+ }
+
+ bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()).
+ option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
+ option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()).
+ option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()).
+ option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(),
+ clientConfig.getWriteBufHighWaterMark()));
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
+ ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()), Http2Handler.newHandler(false));
+ }
+ ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
+ clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
+ new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler());
+ }
+ });
+
+ applyOptions(clientBootstrap);
+
+ startUpHouseKeepingService();
+ }
+
+ private void buildSslContext() {
+ SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+ try {
+ sslContext = SslContextBuilder.forClient()
+ .sslProvider(provider)
+ /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+ * Please refer to the HTTP/2 specification for cipher requirements. */
+ .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ } catch (SSLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void stop() {
+ // try {
+ ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
+
+ for (ChannelWrapper cw : this.channelTables.values()) {
+ this.closeChannel(null, cw.getChannel());
+ }
+
+ this.channelTables.clear();
+
+ this.ioGroup.shutdownGracefully();
+
+ ThreadUtils.shutdownGracefully(channelEventExecutor);
+
+ this.workerGroup.shutdownGracefully();
+ /*
+ } catch (Exception e) {
+ LOG.error("RemotingClient stopped error !", e);
+ }
+ */
+
+ super.stop();
+ }
+
+ private void closeChannel(final String addr, final Channel channel) {
+ if (null == channel)
+ return;
+
+ final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+ //Workaround for null
+ if (null == prevCW) {
+ return;
+ }
+
+ LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW);
+
+ if (prevCW.getChannel() != channel) {
+ LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ LOG.info("Channel {} has been removed !", addrRemote);
+ }
+
+ channel.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.warn("Close channel {} {}", channel, future.isSuccess());
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Close channel error !", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Close channel error !", e);
+ }
+ }
+
+ private Channel createIfAbsent(final String addr) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isActive()) {
+ return cw.getChannel();
+ }
+ return this.createChannel(addr);
+ }
+
+ //FIXME need test to verify
+ private Channel createChannel(final String addr) {
+ ChannelWrapper cw = null;
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean createNewConnection;
+ cw = this.channelTables.get(addr);
+ if (cw != null) {
+ if (cw.isActive()) {
+ return cw.getChannel();
+ } else if (!cw.getChannelFuture().isDone()) {
+ createNewConnection = false;
+ } else {
+ this.channelTables.remove(addr);
+ createNewConnection = true;
+ }
+ } else {
+ createNewConnection = true;
+ }
+
+ if (createNewConnection) {
+ String[] s = addr.split(":");
+ SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
+ ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
+ LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+ cw = new ChannelWrapper(channelFuture);
+ this.channelTables.put(addr, cw);
+ }
+ } catch (Exception e) {
+ LOG.error("createChannel: create channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ if (cw != null) {
+ ChannelFuture channelFuture = cw.getChannelFuture();
+ if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+ if (cw.isActive()) {
+ LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+ return cw.getChannel();
+ } else {
+ LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
+ this.closeChannel(addr, cw.getChannel());
+ }
+ } else {
+ LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+ channelFuture.toString());
+ this.closeChannel(addr, cw.getChannel());
+ }
+ }
+ return null;
+ }
+
+ private void closeChannel(final Channel channel) {
+ if (null == channel)
+ return;
+
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ ChannelWrapper prevCW = null;
+ String addrRemote = null;
+
+ for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+ ChannelWrapper prev = entry.getValue();
+ if (prev.getChannel() != null) {
+ if (prev.getChannel() == channel) {
+ prevCW = prev;
+ addrRemote = entry.getKey();
+ break;
+ }
+ }
+ }
+
+ if (null == prevCW) {
+ LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ //RemotingHelper.closeChannel(channel);
+ }
+ } catch (Exception e) {
+ LOG.error("closeChannel: close the channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("closeChannel exception", e);
+ }
+ }
+
+ @Override
+ public RemotingCommand invoke(final String address, final RemotingCommand request, final long timeoutMillis) {
+ request.trafficType(TrafficType.REQUEST_SYNC);
+
+ Channel channel = this.createIfAbsent(address);
+ if (channel != null && channel.isActive()) {
+ try {
+ return this.invokeWithInterceptor(channel, request, timeoutMillis);
+
+ } catch (RemoteTimeoutException e) {
+ if (this.clientConfig.isClientCloseSocketIfTimeout()) {
+ LOG.warn("invoke: timeout, so close the socket {} ms, {}", timeoutMillis, address);
+ this.closeChannel(address, channel);
+ }
+
+ LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address);
+ throw e;
+ } finally {
+ /*
+ if (this.clientConfig.isClientShortConnectionEnable()) {
+ this.closeChannel(addr, channel);
+ }
+ */
+ }
+ } else {
+ this.closeChannel(address, channel);
+ throw new RemoteConnectFailureException(address);
+ }
+
+ }
+
+ @Override
+ public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler,
+ final long timeoutMillis) {
+
+ final Channel channel = this.createIfAbsent(address);
+ if (channel != null && channel.isActive()) {
+ // We support Netty's channel-level backpressure thereby respecting slow receivers on the other side.
+ if (!channel.isWritable()) {
+ // Note: It's up to the layer above a transport to decide whether or not to requeue a canceled write.
+ LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
+ }
+ this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis);
+ } else {
+ this.closeChannel(address, channel);
+ }
+ }
+
+ @Override
+ public void invokeOneWay(final String address, final RemotingCommand request, final long timeoutMillis) {
+ final Channel channel = this.createIfAbsent(address);
+ if (channel != null && channel.isActive()) {
+ if (!channel.isWritable()) {
+ //if (this.clientConfig.isSocketFlowControl()) {
+ LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
+ //throw new ServiceInvocationFailureException(String.format("Channel[%s] is not writable now", channel.toString()));
+ }
+ this.invokeOnewayWithInterceptor(channel, request, timeoutMillis);
+ } else {
+ this.closeChannel(address, channel);
+ }
+ }
+
+ private class ChannelWrapper {
+ private final ChannelFuture channelFuture;
+
+ ChannelWrapper(ChannelFuture channelFuture) {
+ this.channelFuture = channelFuture;
+ }
+
+ boolean isActive() {
+ return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+ }
+
+ boolean isWriteable() {
+ return this.channelFuture.channel().isWritable();
+ }
+
+ private Channel getChannel() {
+ return this.channelFuture.channel();
+ }
+
+ ChannelFuture getChannelFuture() {
+ return channelFuture;
+ }
+ }
+
+ private class ClientConnectionHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
+ ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
+ }
+
+ @Override
+ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
+ ChannelPromise promise)
+ throws Exception {
+ LOG.info("Connected from {} to {}.", localAddress, remoteAddress);
+ super.connect(ctx, remoteAddress, localAddress, promise);
+
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+ }
+
+ @Override
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ LOG.info("Remote address {} disconnect channel {}.", ctx.channel().remoteAddress(), ctx.channel());
+
+ closeChannel(ctx.channel());
+
+ super.disconnect(ctx, promise);
+
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ LOG.info("Remote address {} close channel {}.", ctx.channel().remoteAddress(), ctx.channel());
+
+ closeChannel(ctx.channel());
+
+ super.close(ctx, promise);
+
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state().equals(IdleState.ALL_IDLE)) {
+ LOG.info("Close channel {} because of idle event {} ", ctx.channel(), event);
+ closeChannel(ctx.channel());
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel()));
+ }
+ }
+
+ ctx.fireUserEventTriggered(evt);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
+
+ closeChannel(ctx.channel());
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
+ }
+ }
+}