You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:31 UTC
[24/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
index c9e534f..4705bed 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
@@ -6,30 +6,28 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.namesrv.kvconfig;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.body.KVTable;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KVConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
@@ -38,19 +36,17 @@ public class KVConfigManager {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
- new HashMap<String, HashMap<String, String>>();
-
+ new HashMap<String, HashMap<String, String>>();
public KVConfigManager(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
-
public void load() {
String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
if (content != null) {
KVConfigSerializeWrapper kvConfigSerializeWrapper =
- KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
+ KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
if (null != kvConfigSerializeWrapper) {
this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
log.info("load KV config table OK");
@@ -58,7 +54,6 @@ public class KVConfigManager {
}
}
-
public void putKVConfig(final String namespace, final String key, final String value) {
try {
this.lock.writeLock().lockInterruptibly();
@@ -73,10 +68,10 @@ public class KVConfigManager {
final String prev = kvTable.put(key, value);
if (null != prev) {
log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", //
- namespace, key, value);
+ namespace, key, value);
} else {
log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", //
- namespace, key, value);
+ namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
@@ -102,7 +97,7 @@ public class KVConfigManager {
}
} catch (IOException e) {
log.error("persist kvconfig Exception, "
- + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
+ + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
} finally {
this.lock.readLock().unlock();
}
@@ -120,7 +115,7 @@ public class KVConfigManager {
if (null != kvTable) {
String value = kvTable.remove(key);
log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", //
- namespace, key, value);
+ namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
@@ -179,14 +174,14 @@ public class KVConfigManager {
{
log.info("configTable SIZE: {}", this.configTable.size());
Iterator<Entry<String, HashMap<String, String>>> it =
- this.configTable.entrySet().iterator();
+ this.configTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<String, String>> next = it.next();
Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();
while (itSub.hasNext()) {
Entry<String, String> nextSub = itSub.next();
log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),
- nextSub.getValue());
+ nextSub.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
index 6465927..e35a37c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
@@ -6,30 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.namesrv.kvconfig;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
import java.util.HashMap;
-
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class KVConfigSerializeWrapper extends RemotingSerializable {
private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;
-
public HashMap<String, HashMap<String, String>> getConfigTable() {
return configTable;
}
-
public void setConfigTable(HashMap<String, HashMap<String, String>> configTable) {
this.configTable = configTable;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index 9ee56a4..95410fa 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -6,16 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.namesrv.processor;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
@@ -27,17 +28,14 @@ import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final DefaultMQAdminExt adminExt;
private final String productEnvName;
-
public ClusterTestRequestProcessor(NamesrvController namesrvController, String productEnvName) {
super(namesrvController);
this.productEnvName = productEnvName;
@@ -51,18 +49,17 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
}
}
-
@Override
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
- (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+ (GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
String orderTopicConf =
- this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
- requestHeader.getTopic());
+ this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
+ requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
} else {
try {
@@ -82,7 +79,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
- + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index e47f300..0135274 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -6,16 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.namesrv.processor;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MQVersion.Version;
import org.apache.rocketmq.common.MixAll;
@@ -28,40 +32,43 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.*;
+import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.DeleteTopicInNamesrvRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-
public class DefaultRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
protected final NamesrvController namesrvController;
-
public DefaultRequestProcessor(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
-
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
- request.getCode(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- request);
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
}
switch (request.getCode()) {
@@ -75,8 +82,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
- }
- else {
+ } else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
@@ -121,12 +127,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final PutKVConfigRequestHeader requestHeader =
- (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
+ (PutKVConfigRequestHeader)request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
this.namesrvController.getKvConfigManager().putKVConfig(
- requestHeader.getNamespace(),
- requestHeader.getKey(),
- requestHeader.getValue()
+ requestHeader.getNamespace(),
+ requestHeader.getKey(),
+ requestHeader.getValue()
);
response.setCode(ResponseCode.SUCCESS);
@@ -136,13 +142,13 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
- final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader();
+ final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader)response.readCustomHeader();
final GetKVConfigRequestHeader requestHeader =
- (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);
+ (GetKVConfigRequestHeader)request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);
String value = this.namesrvController.getKvConfigManager().getKVConfig(
- requestHeader.getNamespace(),
- requestHeader.getKey()
+ requestHeader.getNamespace(),
+ requestHeader.getKey()
);
if (value != null) {
@@ -160,11 +166,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteKVConfigRequestHeader requestHeader =
- (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);
+ (DeleteKVConfigRequestHeader)request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);
this.namesrvController.getKvConfigManager().deleteKVConfig(
- requestHeader.getNamespace(),
- requestHeader.getKey()
+ requestHeader.getNamespace(),
+ requestHeader.getKey()
);
response.setCode(ResponseCode.SUCCESS);
@@ -173,11 +179,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
}
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
- final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+ final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
- (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+ (RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
@@ -189,19 +195,18 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
}
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
- requestHeader.getClusterName(),
- requestHeader.getBrokerAddr(),
- requestHeader.getBrokerName(),
- requestHeader.getBrokerId(),
- requestHeader.getHaServerAddr(),
- registerBrokerBody.getTopicConfigSerializeWrapper(),
- registerBrokerBody.getFilterServerList(),
- ctx.channel());
+ requestHeader.getClusterName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerId(),
+ requestHeader.getHaServerAddr(),
+ registerBrokerBody.getTopicConfigSerializeWrapper(),
+ registerBrokerBody.getFilterServerList(),
+ ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
-
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
@@ -212,9 +217,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
- final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+ final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
- (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+ (RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
@@ -226,20 +231,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
}
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
- requestHeader.getClusterName(),
- requestHeader.getBrokerAddr(),
- requestHeader.getBrokerName(),
- requestHeader.getBrokerId(),
- requestHeader.getHaServerAddr(),
- topicConfigWrapper,
- null,
- ctx.channel()
+ requestHeader.getClusterName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerId(),
+ requestHeader.getHaServerAddr(),
+ topicConfigWrapper,
+ null,
+ ctx.channel()
);
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
-
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
@@ -250,13 +254,13 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final UnRegisterBrokerRequestHeader requestHeader =
- (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
+ (UnRegisterBrokerRequestHeader)request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
this.namesrvController.getRouteInfoManager().unregisterBroker(
- requestHeader.getClusterName(),
- requestHeader.getBrokerAddr(),
- requestHeader.getBrokerName(),
- requestHeader.getBrokerId());
+ requestHeader.getClusterName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerId());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
@@ -266,15 +270,15 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
- (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+ (GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
- this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
- requestHeader.getTopic());
+ this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
+ requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
@@ -287,7 +291,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
- + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
@@ -304,16 +308,16 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class);
- final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader();
+ final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader)response.readCustomHeader();
final WipeWritePermOfBrokerRequestHeader requestHeader =
- (WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class);
+ (WipeWritePermOfBrokerRequestHeader)request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class);
int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
log.info("wipe write perm of broker[{}], client: {}, {}",
- requestHeader.getBrokerName(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- wipeTopicCnt);
+ requestHeader.getBrokerName(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ wipeTopicCnt);
responseHeader.setWipeTopicCount(wipeTopicCnt);
response.setCode(ResponseCode.SUCCESS);
@@ -335,7 +339,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteTopicInNamesrvRequestHeader requestHeader =
- (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
+ (DeleteTopicInNamesrvRequestHeader)request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());
@@ -347,10 +351,10 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetKVListByNamespaceRequestHeader requestHeader =
- (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);
+ (GetKVListByNamespaceRequestHeader)request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(
- requestHeader.getNamespace());
+ requestHeader.getNamespace());
if (null != jsonValue) {
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
@@ -366,7 +370,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetTopicsByClusterRequestHeader requestHeader =
- (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
+ (GetTopicsByClusterRequestHeader)request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
@@ -376,7 +380,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
-
private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -388,7 +391,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
-
private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -400,7 +402,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
-
private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -412,9 +413,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
-
private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
index 47e1dc9..f4bbf24 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
@@ -6,51 +6,45 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.namesrv.routeinfo;
+import io.netty.channel.Channel;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.ChannelEventListener;
-import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class BrokerHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
-
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
-
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
-
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
-
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
-
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 82b4cbf..e440e61 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -16,6 +16,18 @@
*/
package org.apache.rocketmq.namesrv.routeinfo;
+import io.netty.channel.Channel;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -30,17 +42,9 @@ import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-
public class RouteInfoManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
@@ -51,7 +55,6 @@ public class RouteInfoManager {
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
@@ -97,20 +100,19 @@ public class RouteInfoManager {
}
public RegisterBrokerResult registerBroker(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final Channel channel) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
-
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
@@ -120,7 +122,6 @@ public class RouteInfoManager {
boolean registerFirst = false;
-
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
@@ -134,13 +135,12 @@ public class RouteInfoManager {
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
-
if (null != topicConfigWrapper //
- && MixAll.MASTER_ID == brokerId) {
+ && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
- || registerFirst) {
+ || registerFirst) {
ConcurrentHashMap<String, TopicConfig> tcTable =
- topicConfigWrapper.getTopicConfigTable();
+ topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
@@ -149,18 +149,16 @@ public class RouteInfoManager {
}
}
-
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
- new BrokerLiveInfo(
- System.currentTimeMillis(),
- topicConfigWrapper.getDataVersion(),
- channel,
- haServerAddr));
+ new BrokerLiveInfo(
+ System.currentTimeMillis(),
+ topicConfigWrapper.getDataVersion(),
+ channel,
+ haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
}
-
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
@@ -169,7 +167,6 @@ public class RouteInfoManager {
}
}
-
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
@@ -224,7 +221,7 @@ public class RouteInfoManager {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
- queueData);
+ queueData);
it.remove();
}
}
@@ -274,18 +271,18 @@ public class RouteInfoManager {
}
public void unregisterBroker(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId) {
try {
try {
this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
if (brokerLiveInfo != null) {
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
- brokerLiveInfo != null ? "OK" : "Failed",
- brokerAddr
+ brokerLiveInfo != null ? "OK" : "Failed",
+ brokerAddr
);
}
@@ -296,14 +293,14 @@ public class RouteInfoManager {
if (null != brokerData) {
String addr = brokerData.getBrokerAddrs().remove(brokerId);
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
- addr != null ? "OK" : "Failed",
- brokerAddr
+ addr != null ? "OK" : "Failed",
+ brokerAddr
);
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
- brokerName
+ brokerName
);
removeBrokerName = true;
@@ -315,13 +312,13 @@ public class RouteInfoManager {
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
- removed ? "OK" : "Failed",
- brokerName);
+ removed ? "OK" : "Failed",
+ brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
- clusterName
+ clusterName
);
}
}
@@ -377,7 +374,6 @@ public class RouteInfoManager {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
-
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
@@ -389,8 +385,8 @@ public class RouteInfoManager {
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData();
brokerDataClone.setBrokerName(brokerData.getBrokerName());
- brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
- .getBrokerAddrs().clone());
+ brokerDataClone.setBrokerAddrs((HashMap<Long, String>)brokerData
+ .getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
@@ -439,7 +435,7 @@ public class RouteInfoManager {
try {
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
- this.brokerLiveTable.entrySet().iterator();
+ this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
@@ -461,7 +457,6 @@ public class RouteInfoManager {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
-
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
@@ -472,7 +467,7 @@ public class RouteInfoManager {
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
- this.brokerAddrTable.entrySet().iterator();
+ this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
@@ -485,7 +480,7 @@ public class RouteInfoManager {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
- brokerId, brokerAddr);
+ brokerId, brokerAddr);
break;
}
}
@@ -494,7 +489,7 @@ public class RouteInfoManager {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
- brokerData.getBrokerName());
+ brokerData.getBrokerName());
}
}
@@ -507,12 +502,11 @@ public class RouteInfoManager {
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
- brokerNameFound, clusterName);
-
+ brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
- clusterName);
+ clusterName);
it.remove();
}
@@ -523,7 +517,7 @@ public class RouteInfoManager {
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
- this.topicQueueTable.entrySet().iterator();
+ this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
@@ -535,14 +529,14 @@ public class RouteInfoManager {
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
- topic, queueData);
+ topic, queueData);
}
}
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
- topic);
+ topic);
}
}
}
@@ -603,7 +597,6 @@ public class RouteInfoManager {
}
}
-
public byte[] getSystemTopicList() {
TopicList topicList = new TopicList();
try {
@@ -644,7 +637,7 @@ public class RouteInfoManager {
Set<String> brokerNameSet = this.clusterAddrTable.get(cluster);
for (String brokerName : brokerNameSet) {
Iterator<Entry<String, List<QueueData>>> topicTableIt =
- this.topicQueueTable.entrySet().iterator();
+ this.topicQueueTable.entrySet().iterator();
while (topicTableIt.hasNext()) {
Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
String topic = topicEntry.getKey();
@@ -673,13 +666,13 @@ public class RouteInfoManager {
try {
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, List<QueueData>>> topicTableIt =
- this.topicQueueTable.entrySet().iterator();
+ this.topicQueueTable.entrySet().iterator();
while (topicTableIt.hasNext()) {
Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
String topic = topicEntry.getKey();
List<QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- && TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) {
+ && TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) {
topicList.getTopicList().add(topic);
}
}
@@ -699,13 +692,13 @@ public class RouteInfoManager {
try {
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, List<QueueData>>> topicTableIt =
- this.topicQueueTable.entrySet().iterator();
+ this.topicQueueTable.entrySet().iterator();
while (topicTableIt.hasNext()) {
Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
String topic = topicEntry.getKey();
List<QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
+ && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
topicList.getTopicList().add(topic);
}
}
@@ -725,14 +718,14 @@ public class RouteInfoManager {
try {
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, List<QueueData>>> topicTableIt =
- this.topicQueueTable.entrySet().iterator();
+ this.topicQueueTable.entrySet().iterator();
while (topicTableIt.hasNext()) {
Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
String topic = topicEntry.getKey();
List<QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- && !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())
- && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
+ && !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())
+ && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
topicList.getTopicList().add(topic);
}
}
@@ -747,66 +740,55 @@ public class RouteInfoManager {
}
}
-
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
-
public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel,
- String haServerAddr) {
+ String haServerAddr) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
this.dataVersion = dataVersion;
this.channel = channel;
this.haServerAddr = haServerAddr;
}
-
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
-
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
-
public DataVersion getDataVersion() {
return dataVersion;
}
-
public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}
-
public Channel getChannel() {
return channel;
}
-
public void setChannel(Channel channel) {
this.channel = channel;
}
-
public String getHaServerAddr() {
return haServerAddr;
}
-
public void setHaServerAddr(String haServerAddr) {
this.haServerAddr = haServerAddr;
}
-
@Override
public String toString() {
return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp + ", dataVersion=" + dataVersion
- + ", channel=" + channel + ", haServerAddr=" + haServerAddr + "]";
+ + ", channel=" + channel + ", haServerAddr=" + haServerAddr + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/pom.xml
----------------------------------------------------------------------
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 6e75c54..15f4443 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
index ba93f09..98cbb53 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/ChannelEventListener.java
@@ -6,31 +6,27 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting;
import io.netty.channel.Channel;
-
/**
*
*/
public interface ChannelEventListener {
void onChannelConnect(final String remoteAddr, final Channel channel);
-
void onChannelClose(final String remoteAddr, final Channel channel);
-
void onChannelException(final String remoteAddr, final Channel channel);
-
void onChannelIdle(final String remoteAddr, final Channel channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java
index de7d3b0..bd1d122 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/CommandCustomHeader.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
public interface CommandCustomHeader {
void checkFields() throws RemotingCommandException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
index 3db5f69..c3bcd87 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
-
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
index c489f1d..c118180 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
@@ -19,11 +19,9 @@ package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
public interface RPCHook {
void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
-
void doAfterResponse(final String remoteAddr, final RemotingCommand request,
- final RemotingCommand response);
+ final RemotingCommand response);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 5f96a34..6c7f7a9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -6,16 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -23,10 +25,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-
/**
*
*/
@@ -34,28 +32,22 @@ public interface RemotingClient extends RemotingService {
public void updateNameServerAddressList(final List<String> addrs);
-
public List<String> getNameServerAddressList();
-
public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
- final long timeoutMillis) throws InterruptedException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException;
-
+ final long timeoutMillis) throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException;
public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
- final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
- RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
-
+ final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
+ RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
- RemotingTimeoutException, RemotingSendRequestException;
-
+ throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
+ RemotingTimeoutException, RemotingSendRequestException;
public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
- final ExecutorService executor);
-
+ final ExecutorService executor);
public boolean isChannelWriteable(final String addr);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index 98270ec..d0b13fc 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -16,16 +16,14 @@
*/
package org.apache.rocketmq.remoting;
+import io.netty.channel.Channel;
+import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.Channel;
-
-import java.util.concurrent.ExecutorService;
-
/**
*
@@ -33,30 +31,24 @@ import java.util.concurrent.ExecutorService;
public interface RemotingServer extends RemotingService {
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
- final ExecutorService executor);
-
+ final ExecutorService executor);
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
-
int localListenPort();
-
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
-
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
- final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
- RemotingTimeoutException;
-
+ final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
+ RemotingTimeoutException;
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
- final InvokeCallback invokeCallback) throws InterruptedException,
- RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
-
+ final InvokeCallback invokeCallback) throws InterruptedException,
+ RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
- RemotingSendRequestException;
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
+ RemotingSendRequestException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
index 1af2b16..50e89d0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
@@ -20,9 +20,7 @@ package org.apache.rocketmq.remoting;
public interface RemotingService {
void start();
-
void shutdown();
-
void registerRPCHook(RPCHook rpcHook);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java
index b552057..fabc06b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/annotation/CFNullable.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.annotation;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
index 180348c..2f2fc77 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.common;
@@ -20,28 +20,23 @@ public class Pair<T1, T2> {
private T1 object1;
private T2 object2;
-
public Pair(T1 object1, T2 object2) {
this.object1 = object1;
this.object2 = object2;
}
-
public T1 getObject1() {
return object1;
}
-
public void setObject1(T1 object1) {
this.object1 = object1;
}
-
public T2 getObject2() {
return object2;
}
-
public void setObject2(T2 object2) {
this.object2 = object2;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 4300537..8d189e7 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -6,28 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.common;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.Channel;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class RemotingHelper {
public static final String ROCKETMQ_REMOTING = "RocketmqRemoting";
@@ -56,8 +54,8 @@ public class RemotingHelper {
}
public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
- final long timeoutMillis) throws InterruptedException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException {
+ final long timeoutMillis) throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
long beginTime = System.currentTimeMillis();
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
@@ -69,8 +67,7 @@ public class RemotingHelper {
socketChannel.configureBlocking(true);
//bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
- socketChannel.socket().setSoTimeout((int) timeoutMillis);
-
+ socketChannel.socket().setSoTimeout((int)timeoutMillis);
ByteBuffer byteBufferRequest = request.encode();
while (byteBufferRequest.hasRemaining()) {
@@ -86,7 +83,6 @@ public class RemotingHelper {
throw new RemotingSendRequestException(addr);
}
-
Thread.sleep(1);
}
@@ -106,7 +102,6 @@ public class RemotingHelper {
throw new RemotingTimeoutException(addr, timeoutMillis);
}
-
Thread.sleep(1);
}
@@ -125,11 +120,9 @@ public class RemotingHelper {
throw new RemotingTimeoutException(addr, timeoutMillis);
}
-
Thread.sleep(1);
}
-
byteBufferBody.flip();
return RemotingCommand.decode(byteBufferBody);
} catch (IOException e) {
@@ -152,7 +145,6 @@ public class RemotingHelper {
}
}
-
public static String parseChannelRemoteAddr(final Channel channel) {
if (null == channel) {
return "";
@@ -172,19 +164,17 @@ public class RemotingHelper {
return "";
}
-
public static String parseChannelRemoteName(final Channel channel) {
if (null == channel) {
return "";
}
- final InetSocketAddress remote = (InetSocketAddress) channel.remoteAddress();
+ final InetSocketAddress remote = (InetSocketAddress)channel.remoteAddress();
if (remote != null) {
return remote.getAddress().getHostName();
}
return "";
}
-
public static String parseSocketAddressAddr(SocketAddress socketAddress) {
if (socketAddress != null) {
final String addr = socketAddress.toString();
@@ -196,10 +186,9 @@ public class RemotingHelper {
return "";
}
-
public static String parseSocketAddressName(SocketAddress socketAddress) {
- final InetSocketAddress addrs = (InetSocketAddress) socketAddress;
+ final InetSocketAddress addrs = (InetSocketAddress)socketAddress;
if (addrs != null) {
return addrs.getAddress().getHostName();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 005471e..bcc2232 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -6,22 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.common;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.Inet6Address;
@@ -36,7 +33,8 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Enumeration;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemotingUtil {
public static final String OS_NAME = System.getProperty("os.name");
@@ -69,7 +67,7 @@ public class RemotingUtil {
try {
final Method method = providerClazz.getMethod("provider");
if (method != null) {
- final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null);
+ final SelectorProvider selectorProvider = (SelectorProvider)method.invoke(null);
if (selectorProvider != null) {
result = selectorProvider.openSelector();
}
@@ -141,7 +139,6 @@ public class RemotingUtil {
return null;
}
-
public static String normalizeHostAddress(final InetAddress localHost) {
if (localHost instanceof Inet6Address) {
return "[" + localHost.getHostAddress() + "]";
@@ -156,22 +153,19 @@ public class RemotingUtil {
return isa;
}
-
public static String socketAddress2String(final SocketAddress addr) {
StringBuilder sb = new StringBuilder();
- InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
+ InetSocketAddress inetSocketAddress = (InetSocketAddress)addr;
sb.append(inetSocketAddress.getAddress().getHostAddress());
sb.append(":");
sb.append(inetSocketAddress.getPort());
return sb.toString();
}
-
public static SocketChannel connect(SocketAddress remote) {
return connect(remote, 1000 * 5);
}
-
public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
SocketChannel sc = null;
try {
@@ -197,14 +191,13 @@ public class RemotingUtil {
return null;
}
-
public static void closeChannel(Channel channel) {
final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);
channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
- future.isSuccess());
+ future.isSuccess());
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
index 7734f86..c8d594e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
@@ -6,30 +6,27 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.remoting.common;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
-
public class SemaphoreReleaseOnlyOnce {
private final AtomicBoolean released = new AtomicBoolean(false);
private final Semaphore semaphore;
-
public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
this.semaphore = semaphore;
}
-
public void release() {
if (this.semaphore != null) {
if (this.released.compareAndSet(false, true)) {
@@ -38,7 +35,6 @@ public class SemaphoreReleaseOnlyOnce {
}
}
-
public Semaphore getSemaphore() {
return semaphore;
}