You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/11 12:51:02 UTC
[6/9] incubator-rocketmq git commit: Reformat code globally second
time
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
----------------------------------------------------------------------
diff --git a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
index 22eeb86..bfbddf4 100644
--- a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
+++ b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java
@@ -48,7 +48,6 @@ public class FilterSpiTest {
}
}
-
@Test
public void testRegister() {
FilterFactory.INSTANCE.register(new NothingFilter());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
index ff8450e..e459b1a 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -83,7 +83,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return false;
}
- private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final RegisterMessageFilterClassRequestHeader requestHeader =
(RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
@@ -108,7 +109,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
+ private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx,
+ final RemotingCommand request) throws Exception {
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
@@ -215,7 +217,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return null;
}
- private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response,
+ private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx,
+ final RemotingCommand response,
final List<MessageExt> msgList) {
if (null != msgList) {
ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
----------------------------------------------------------------------
diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
index 53521d4..d2adac5 100644
--- a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
+++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
@@ -52,7 +52,6 @@ public class ProducerInstance {
return nameServerAddress + "_" + group;
}
-
public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {
if (StringUtils.isBlank(group)) {
group = DEFAULT_GROUP;
@@ -75,7 +74,6 @@ public class ProducerInstance {
return defaultMQProducer;
}
-
public void removeAndClose(String nameServerAddress, String group) {
if (group == null) {
group = DEFAULT_GROUP;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java
----------------------------------------------------------------------
diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java
index 3fd8d4c..646e924 100644
--- a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java
+++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java
@@ -59,7 +59,6 @@ public class RocketmqLog4jAppender extends AppenderSkeleton {
public RocketmqLog4jAppender() {
}
-
public void activateOptions() {
LogLog.debug("Getting initial context.");
if (!checkEntryConditions()) {
@@ -72,11 +71,8 @@ public class RocketmqLog4jAppender extends AppenderSkeleton {
}
}
-
/**
* Info,error,warn,callback method implementation
- *
- * @param event
*/
public void append(LoggingEvent event) {
if (null == producer) {
@@ -95,7 +91,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton {
} catch (Exception e) {
String msg = new String(data);
errorHandler.error("Could not send message in RocketmqLog4jAppender [" + name + "].Message is :" + msg, e,
- ErrorCode.GENERIC_FAILURE);
+ ErrorCode.GENERIC_FAILURE);
}
}
@@ -145,7 +141,6 @@ public class RocketmqLog4jAppender extends AppenderSkeleton {
return topic;
}
-
public void setTopic(String topic) {
this.topic = topic;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java
----------------------------------------------------------------------
diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java
index 5a6362e..9543f1c 100644
--- a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java
+++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java
@@ -70,10 +70,9 @@ public class RocketmqLog4j2Appender extends AbstractAppender {
*/
private String topic;
-
protected RocketmqLog4j2Appender(String name, Filter filter, Layout<? extends Serializable> layout,
- boolean ignoreExceptions, String nameServerAddress, String producerGroup,
- String topic, String tag) {
+ boolean ignoreExceptions, String nameServerAddress, String producerGroup,
+ String topic, String tag) {
super(name, filter, layout, ignoreExceptions);
this.producer = producer;
this.topic = topic;
@@ -86,15 +85,13 @@ public class RocketmqLog4j2Appender extends AbstractAppender {
ErrorHandler handler = this.getHandler();
if (handler != null) {
handler.error("Starting RocketmqLog4j2Appender [" + this.getName()
- + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
+ + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
}
}
/**
* Info,error,warn,callback method implementation
- *
- * @param event
*/
public void append(LogEvent event) {
if (null == producer) {
@@ -119,10 +116,6 @@ public class RocketmqLog4j2Appender extends AbstractAppender {
/**
* When system exit,this method will be called to close resources
- *
- * @param timeout
- * @param timeUnit
- * @return
*/
public boolean stop(long timeout, TimeUnit timeUnit) {
this.setStopping();
@@ -132,7 +125,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender {
ErrorHandler handler = this.getHandler();
if (handler != null) {
handler.error("Closeing RocketmqLog4j2Appender [" + this.getName()
- + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
+ + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
}
@@ -227,7 +220,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender {
public RocketmqLog4j2Appender build() {
return new RocketmqLog4j2Appender(name, filter, layout, ignoreExceptions,
- nameServerAddress, producerGroup, topic, tag);
+ nameServerAddress, producerGroup, topic, tag);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java
----------------------------------------------------------------------
diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java
index 50ba564..4018cd4 100644
--- a/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java
+++ b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java
@@ -62,8 +62,6 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
/**
* Info,error,warn,callback method implementation
- *
- * @param event
*/
@Override
protected void append(ILoggingEvent event) {
@@ -100,7 +98,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
producer = ProducerInstance.getProducerInstance().getInstance(nameServerAddress, producerGroup);
} catch (Exception e) {
addError("Starting RocketmqLogbackAppender [" + this.getName()
- + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
+ + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
if (producer != null) {
super.start();
@@ -122,7 +120,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup);
} catch (Exception e) {
addError("Closeing RocketmqLogbackAppender [" + this.getName()
- + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
+ + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage());
}
// Help garbage collection
@@ -144,7 +142,6 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
}
}
-
public Layout getLayout() {
return this.layout;
}
@@ -160,7 +157,6 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> {
return tag;
}
-
public void setTag(String tag) {
this.tag = tag;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
----------------------------------------------------------------------
diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
index 9faebb9..38904c0 100644
--- a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
+++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
@@ -30,7 +30,6 @@ import java.lang.reflect.Field;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Basic test rocketmq broker and name server init
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java
----------------------------------------------------------------------
diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java
index 6306ec5..c139283 100644
--- a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java
+++ b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java
@@ -22,8 +22,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
-public abstract class Log4jTest extends AbstractTestCase{
+public abstract class Log4jTest extends AbstractTestCase {
@Before
public abstract void init();
@@ -37,8 +36,8 @@ public abstract class Log4jTest extends AbstractTestCase{
for (int i = 0; i < 10; i++) {
logger.info("log4j " + this.getType() + " simple test message " + i);
}
- int received = consumeMessages(10, "log4j",10);
- Assert.assertTrue(received>5);
+ int received = consumeMessages(10, "log4j", 10);
+ Assert.assertTrue(received > 5);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java
----------------------------------------------------------------------
diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java
index 4f9d3e5..d7ec184 100644
--- a/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java
+++ b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
-public class LogbackTest extends AbstractTestCase{
+public class LogbackTest extends AbstractTestCase {
@Before
public void init() throws JoranException {
@@ -41,7 +41,6 @@ public class LogbackTest extends AbstractTestCase{
StatusPrinter.printInCaseOfErrorsOrWarnings(lc);
}
-
@Test
public void testLogback() throws InterruptedException, MQClientException {
clear();
@@ -49,7 +48,7 @@ public class LogbackTest extends AbstractTestCase{
for (int i = 0; i < 10; i++) {
logger.info("logback test message " + i);
}
- int received = consumeMessages(10, "logback",10);
- Assert.assertTrue(received>=5);
+ int received = consumeMessages(10, "logback", 10);
+ Assert.assertTrue(received >= 5);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java
----------------------------------------------------------------------
diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java
index 4089644..6f6af60 100644
--- a/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java
+++ b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java
@@ -24,14 +24,13 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-public class log4j2Test extends AbstractTestCase{
+public class log4j2Test extends AbstractTestCase {
@Before
public void init() {
Configurator.initialize("log4j2", "src/test/resources/log4j2-example.xml");
}
-
@Test
public void testLog4j2() throws InterruptedException, MQClientException {
clear();
@@ -39,7 +38,7 @@ public class log4j2Test extends AbstractTestCase{
for (int i = 0; i < 10; i++) {
logger.info("log4j2 log message " + i);
}
- int received = consumeMessages(10, "log4j2",10);
- Assert.assertTrue(received>5);
+ int received = consumeMessages(10, "log4j2", 10);
+ Assert.assertTrue(received > 5);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/resources/log4j-example.properties
----------------------------------------------------------------------
diff --git a/logappender/src/test/resources/log4j-example.properties b/logappender/src/test/resources/log4j-example.properties
index 7fdebbb..63b2a98 100644
--- a/logappender/src/test/resources/log4j-example.properties
+++ b/logappender/src/test/resources/log4j-example.properties
@@ -12,23 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
log4j.rootLogger=INFO,stdout
-
log4j.logger.testLogger=INFO,mq
-
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %-4r [%t] (%F:%L) %-5p - %m%n
-
log4j.appender.store=org.apache.log4j.DailyRollingFileAppender
log4j.appender.store.File=${user.home}/logs/rocketmqlogs/appender.log
log4j.appender.store.Append=true
-log4j.appender.store.DatePattern ='_'yyyy-MM-dd'.log'
+log4j.appender.store.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.store.layout=org.apache.log4j.PatternLayout
log4j.appender.store.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
-
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=log
log4j.appender.mq.Topic=TopicTest
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/resources/log4j-example.xml
----------------------------------------------------------------------
diff --git a/logappender/src/test/resources/log4j-example.xml b/logappender/src/test/resources/log4j-example.xml
index b0dc776..6bddde9 100644
--- a/logappender/src/test/resources/log4j-example.xml
+++ b/logappender/src/test/resources/log4j-example.xml
@@ -19,37 +19,37 @@ limitations under the License.
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender">
- <param name="Encoding" value="UTF-8" />
- <param name="Target" value="System.out" />
+ <param name="Encoding" value="UTF-8"/>
+ <param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss},%d %-4r [%t] (%F:%L) %-5p - %m%n" />
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss},%d %-4r [%t] (%F:%L) %-5p - %m%n"/>
</layout>
</appender>
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
- <param name="Tag" value="log1" />
- <param name="Topic" value="TopicTest" />
- <param name="ProducerGroup" value="loggerAppender" />
+ <param name="Tag" value="log1"/>
+ <param name="Topic" value="TopicTest"/>
+ <param name="ProducerGroup" value="loggerAppender"/>
<param name="NameServerAddress" value="127.0.0.1:9876"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
+ <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n"/>
</layout>
</appender>
<logger name="testLogger" additivity="false">
- <level value="INFO" />
- <appender-ref ref="mqAppender1" />
- <appender-ref ref="consoleAppender" />
+ <level value="INFO"/>
+ <appender-ref ref="mqAppender1"/>
+ <appender-ref ref="consoleAppender"/>
</logger>
<logger name="consoleLogger" additivity="false">
- <level value="INFO" />
- <appender-ref ref="consoleAppender" />
+ <level value="INFO"/>
+ <appender-ref ref="consoleAppender"/>
</logger>
<root>
- <level value="INFO" />
+ <level value="INFO"/>
<appender-ref ref="consoleAppender"/>
</root>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/resources/log4j2-example.xml
----------------------------------------------------------------------
diff --git a/logappender/src/test/resources/log4j2-example.xml b/logappender/src/test/resources/log4j2-example.xml
index 3ee8a01..c310855 100644
--- a/logappender/src/test/resources/log4j2-example.xml
+++ b/logappender/src/test/resources/log4j2-example.xml
@@ -17,25 +17,25 @@
-->
<Configuration status="warn" name="Rocketmq">
-<Appenders>
- <RocketMQ name="rocketmqAppender" producerGroup="loggerAppender" nameServerAddress="127.0.0.1:9876"
- topic="TopicTest" tag="log">
- <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
- </RocketMQ>
+ <Appenders>
+ <RocketMQ name="rocketmqAppender" producerGroup="loggerAppender" nameServerAddress="127.0.0.1:9876"
+ topic="TopicTest" tag="log">
+ <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
+ </RocketMQ>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
- </Console>
-</Appenders>
-<Loggers>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
- <Logger name="rocketmqLogger" level="info">
- <AppenderRef ref="rocketmqAppender"/>
- </Logger>
+ <Logger name="rocketmqLogger" level="info">
+ <AppenderRef ref="rocketmqAppender"/>
+ </Logger>
- <Root level="debug">
- <AppenderRef ref="Console"/>
- <AppenderRef ref="rocketmqAppender"/>
- </Root>
-</Loggers>
+ <Root level="debug">
+ <AppenderRef ref="Console"/>
+ <AppenderRef ref="rocketmqAppender"/>
+ </Root>
+ </Loggers>
</Configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index 04cf870..f6611b6 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -50,7 +50,8 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
}
@Override
- public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 9647684..ed5b20b 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -63,7 +63,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand processRequest(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
request.getCode(),
@@ -124,7 +125,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return false;
}
- public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand putKVConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final PutKVConfigRequestHeader requestHeader =
(PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
@@ -140,7 +142,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand getKVConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader();
final GetKVConfigRequestHeader requestHeader =
@@ -163,7 +166,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteKVConfigRequestHeader requestHeader =
(DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);
@@ -215,7 +219,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand registerBroker(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
@@ -251,7 +256,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand unregisterBroker(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final UnRegisterBrokerRequestHeader requestHeader =
(UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
@@ -267,7 +273,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
@@ -306,7 +313,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class);
final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader();
final WipeWritePermOfBrokerRequestHeader requestHeader =
@@ -336,7 +344,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteTopicInNamesrvRequestHeader requestHeader =
(DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
@@ -348,7 +357,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetKVListByNamespaceRequestHeader requestHeader =
(GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);
@@ -367,7 +377,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetTopicsByClusterRequestHeader requestHeader =
(GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
@@ -380,7 +391,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList();
@@ -391,7 +403,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics();
@@ -402,7 +415,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 35790c9..d78ec95 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -276,8 +276,8 @@ public class RouteInfoManager {
this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
- brokerLiveInfo != null ? "OK" : "Failed",
- brokerAddr
+ brokerLiveInfo != null ? "OK" : "Failed",
+ brokerAddr
);
this.filterServerTable.remove(brokerAddr);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
----------------------------------------------------------------------
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
index 5a98cd7..a0e8137 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
@@ -99,7 +99,8 @@ public class ClusterTestRequestProcessorTest {
@Test
public void testGetRouteInfoByTopic() throws RemotingCommandException {
RemotingCommand request = RemotingCommand.createRequestCommand(12, new CommandCustomHeader() {
- @Override public void checkFields() throws RemotingCommandException {
+ @Override
+ public void checkFields() throws RemotingCommandException {
}
});
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
index c863ccf..3e4bd26 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -77,7 +77,7 @@ public class DefaultPromise<V> implements Promise<V> {
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime > 0) {
- for (;; ) {
+ for (; ; ) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
index ae4d3ed..851c283 100644
--- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
@@ -75,7 +75,7 @@ public class LocalMessageCacheTest {
@Test
public void testSubmitConsumeRequest() throws Exception {
- byte [] body = new byte[]{'1', '2', '3'};
+ byte[] body = new byte[] {'1', '2', '3'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(body);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
index 8436189..323c089 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
@@ -63,7 +63,7 @@ public abstract class ServiceThread implements Runnable {
this.thread.join(this.getJointime());
long eclipseTime = System.currentTimeMillis() - beginTime;
log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
- + this.getJointime());
+ + this.getJointime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b66e7de..6143462 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -90,6 +90,7 @@ public abstract class NettyRemotingAbstract {
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
+ *
* @param permitsOneway Number of permits for one-way requests.
* @param permitsAsync Number of permits for asynchronous requests.
*/
@@ -100,12 +101,14 @@ public abstract class NettyRemotingAbstract {
/**
* Custom channel event listener.
+ *
* @return custom channel event listener if defined; null otherwise.
*/
public abstract ChannelEventListener getChannelEventListener();
/**
* Put a netty event to the executor.
+ *
* @param event Netty event instance.
*/
public void putNettyEvent(final NettyEvent event) {
@@ -116,13 +119,14 @@ public abstract class NettyRemotingAbstract {
* Entry of incoming command processing.
*
* <p>
- * <strong>Note:</strong>
- * The incoming remoting command may be
- * <ul>
- * <li>An inquiry request from a remote peer component;</li>
- * <li>A response to a previous request issued by this very participant.</li>
- * </ul>
+ * <strong>Note:</strong>
+ * The incoming remoting command may be
+ * <ul>
+ * <li>An inquiry request from a remote peer component;</li>
+ * <li>A response to a previous request issued by this very participant.</li>
+ * </ul>
* </p>
+ *
* @param ctx Channel handler context.
* @param msg incoming remoting command.
* @throws Exception if there were any error while processing the incoming command.
@@ -145,6 +149,7 @@ public abstract class NettyRemotingAbstract {
/**
* Process incoming request command issued by remote peer.
+ *
* @param ctx channel handler context.
* @param cmd request command.
*/
@@ -235,6 +240,7 @@ public abstract class NettyRemotingAbstract {
/**
* Process response from remote peer to the previous issued requests.
+ *
* @param ctx channel handler context.
* @param cmd response command instance.
*/
@@ -261,7 +267,6 @@ public abstract class NettyRemotingAbstract {
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
- * @param responseFuture
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
@@ -297,12 +302,14 @@ public abstract class NettyRemotingAbstract {
/**
* Custom RPC hook.
+ *
* @return RPC hook if specified; null otherwise.
*/
public abstract RPCHook getRPCHook();
/**
* This method specifies thread pool to use while invoking callback methods.
+ *
* @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the
* netty event-loop thread.
*/
@@ -310,7 +317,7 @@ public abstract class NettyRemotingAbstract {
/**
* <p>
- * This method is periodically invoked to scan and expire deprecated request.
+ * This method is periodically invoked to scan and expire deprecated request.
* </p>
*/
public void scanResponseTable() {
@@ -337,7 +344,8 @@ public abstract class NettyRemotingAbstract {
}
}
- public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+ public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
+ final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index ecf9ab2..34f560f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -322,7 +322,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
if (update) {
Collections.shuffle(addrs);
- log.info("name server address updated. NEW : {} , OLD: {}",addrs,old);
+ log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
this.namesrvAddrList.set(addrs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index b2041b2..7cf82c9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -78,7 +78,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
this(nettyServerConfig, null);
}
- public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
+ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
+ final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index 0570c84..a5e2a23 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.remoting.netty;
-
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888;
private int serverWorkerThreads = 8;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
----------------------------------------------------------------------
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 0f5da6e..e11915b 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -29,7 +29,7 @@ public class RemotingCommandTest {
int source = 261;
SerializeType type = SerializeType.JSON;
byte[] result = RemotingCommand.markProtocolType(source, type);
- assertThat(result).isEqualTo(new byte[]{0, 0, 1, 5});
+ assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5});
}
@Test
@@ -37,7 +37,7 @@ public class RemotingCommandTest {
int source = 16777215;
SerializeType type = SerializeType.ROCKETMQ;
byte[] result = RemotingCommand.markProtocolType(source, type);
- assertThat(result).isEqualTo(new byte[]{1, -1, -1, -1});
+ assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1});
}
@Test
@@ -58,7 +58,7 @@ public class RemotingCommandTest {
int code = RemotingSysResponseCode.SUCCESS;
String remark = "Sample remark";
- RemotingCommand cmd = RemotingCommand.createResponseCommand(code ,remark, SampleCommandCustomHeader.class);
+ RemotingCommand cmd = RemotingCommand.createResponseCommand(code, remark, SampleCommandCustomHeader.class);
assertThat(cmd.getCode()).isEqualTo(code);
assertThat(cmd.getVersion()).isEqualTo(2333);
assertThat(cmd.getRemark()).isEqualTo(remark);
@@ -71,7 +71,7 @@ public class RemotingCommandTest {
int code = RemotingSysResponseCode.SUCCESS;
String remark = "Sample remark";
- RemotingCommand cmd = RemotingCommand.createResponseCommand(code ,remark);
+ RemotingCommand cmd = RemotingCommand.createResponseCommand(code, remark);
assertThat(cmd.getCode()).isEqualTo(code);
assertThat(cmd.getVersion()).isEqualTo(2333);
assertThat(cmd.getRemark()).isEqualTo(remark);
@@ -84,7 +84,7 @@ public class RemotingCommandTest {
int code = RemotingSysResponseCode.SUCCESS;
String remark = "Sample remark";
- RemotingCommand cmd = RemotingCommand.createResponseCommand(code ,remark, CommandCustomHeader.class);
+ RemotingCommand cmd = RemotingCommand.createResponseCommand(code, remark, CommandCustomHeader.class);
assertThat(cmd).isNull();
}
@@ -128,7 +128,7 @@ public class RemotingCommandTest {
int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
CommandCustomHeader header = new SampleCommandCustomHeader();
RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header);
- cmd.setBody(new byte[] { 0, 1, 2, 3, 4});
+ cmd.setBody(new byte[] {0, 1, 2, 3, 4});
ByteBuffer buffer = cmd.encode();
@@ -141,7 +141,7 @@ public class RemotingCommandTest {
RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
- assertThat(decodedCommand.getBody()).isEqualTo(new byte[]{ 0, 1, 2, 3, 4});
+ assertThat(decodedCommand.getBody()).isEqualTo(new byte[] {0, 1, 2, 3, 4});
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
----------------------------------------------------------------------
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
index 38548cd..3e8b7a9 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
@@ -86,7 +86,7 @@ class Sample {
private String stringValue = "string";
private int intValue = 2333;
private Integer integerValue = 666;
- private double[] doubleArray = new double[]{0.618, 1.618};
+ private double[] doubleArray = new double[] {0.618, 1.618};
private List<String> stringList = Arrays.asList("a", "o", "e", "i", "u", "v");
public String getStringValue() {
@@ -136,7 +136,7 @@ class Sample {
if (o == null || getClass() != o.getClass())
return false;
- Sample sample = (Sample)o;
+ Sample sample = (Sample) o;
if (intValue != sample.intValue)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
index 16a62fa..d337638 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
@@ -27,23 +27,17 @@ public interface AppendMessageCallback {
/**
* After message serialization, write MapedByteBuffer
*
- * @param byteBuffer
- * @param maxBlank
- * @param msg
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
- final int maxBlank, final MessageExtBrokerInner msg);
+ final int maxBlank, final MessageExtBrokerInner msg);
/**
* After batched message serialization, write MapedByteBuffer
*
- * @param byteBuffer
- * @param maxBlank
* @param messageExtBatch, backed up by a byte array
- *
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
- final int maxBlank, final MessageExtBatch messageExtBatch);
+ final int maxBlank, final MessageExtBatch messageExtBatch);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index a2cb629..edd68a5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -62,6 +62,7 @@ public class CommitLog {
private volatile long beginTimeInLock = 0;
private final PutMessageLock putMessageLock;
+
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
@@ -77,11 +78,12 @@ public class CommitLog {
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
- @Override protected MessageExtBatchEncoder initialValue() {
+ @Override
+ protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
- this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+ this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
@@ -661,7 +663,7 @@ public class CommitLog {
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+ GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
@@ -758,7 +760,6 @@ public class CommitLog {
putMessageLock.unlock();
}
-
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, messageExtBatch.getBody().length, result);
}
@@ -773,7 +774,6 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
-
handleDiskFlush(result, putMessageResult, messageExtBatch);
handleHA(result, putMessageResult, messageExtBatch);
@@ -885,8 +885,6 @@ public class CommitLog {
return diff;
}
-
-
abstract class FlushCommitLogService extends ServiceThread {
protected static final int RETRY_TIMES_OVER = 10;
}
@@ -1030,23 +1028,19 @@ public class CommitLog {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile boolean flushOK = false;
-
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
-
public long getNextOffset() {
return nextOffset;
}
-
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
-
public boolean waitForFlush(long timeout) {
try {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
@@ -1430,7 +1424,6 @@ public class CommitLog {
this.maxMessageSize = size;
}
-
public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
msgBatchMemory.clear(); //not thread-safe
int totalMsgLen = 0;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 275334c..379162d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -350,7 +350,7 @@ public class ConsumeQueue {
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
- this.getMinOffsetInQueue(), this.topic, this.queueId);
+ this.getMinOffsetInQueue(), this.topic, this.queueId);
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {
minExtAddr = tagsCode;
@@ -567,9 +567,6 @@ public class ConsumeQueue {
/**
* Check {@code tagsCode} is address of extend file or tags code.
- *
- * @param tagsCode
- * @return
*/
public boolean isExtAddr(long tagsCode) {
return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
index 1a177e9..a118cde 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -58,17 +58,17 @@ public class ConsumeQueueExt {
/**
* Constructor.
*
- * @param topic topic
- * @param queueId id of queue
- * @param storePath root dir of files to store.
+ * @param topic topic
+ * @param queueId id of queue
+ * @param storePath root dir of files to store.
* @param mappedFileSize file size
- * @param bitMapLength bit map length.
+ * @param bitMapLength bit map length.
*/
public ConsumeQueueExt(final String topic,
- final int queueId,
- final String storePath,
- final int mappedFileSize,
- final int bitMapLength) {
+ final int queueId,
+ final String storePath,
+ final int mappedFileSize,
+ final int bitMapLength) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
@@ -94,9 +94,6 @@ public class ConsumeQueueExt {
* <p>
* Just test {@code address} is less than 0.
* </p>
- *
- * @param address
- * @return
*/
public boolean isExtAddr(final long address) {
return address <= MAX_ADDR;
@@ -108,9 +105,6 @@ public class ConsumeQueueExt {
* if {@code address} is less than 0, return {@code address} - {@link java.lang.Long#MIN_VALUE};
* else, just return {@code address}
* </p>
- *
- * @param address
- * @return
*/
public long unDecorate(final long address) {
if (isExtAddr(address)) {
@@ -126,7 +120,6 @@ public class ConsumeQueueExt {
* else, just return {@code offset}
* </p>
*
- * @param offset
* @return ext address(value is less than 0)
*/
public long decorate(final long offset) {
@@ -140,7 +133,6 @@ public class ConsumeQueueExt {
* Get data from buffer.
*
* @param address less than 0
- * @return
*/
public CqExtUnit get(final long address) {
CqExtUnit cqExtUnit = new CqExtUnit();
@@ -154,9 +146,7 @@ public class ConsumeQueueExt {
/**
* Get data from buffer, and set to {@code cqExtUnit}
*
- * @param address less than 0
- * @param cqExtUnit
- * @return
+ * @param address less than 0
*/
public boolean get(final long address, final CqExtUnit cqExtUnit) {
if (!isExtAddr(address)) {
@@ -194,7 +184,6 @@ public class ConsumeQueueExt {
* Be careful, this method is not thread safe.
* </p>
*
- * @param cqExtUnit
* @return success: < 0: fail: >=0
*/
public long put(final CqExtUnit cqExtUnit) {
@@ -259,8 +248,6 @@ public class ConsumeQueueExt {
/**
* Load data from file when startup.
- *
- * @return
*/
public boolean load() {
boolean result = this.mappedFileQueue.load();
@@ -379,9 +366,6 @@ public class ConsumeQueueExt {
/**
* flush buffer to file.
- *
- * @param flushLeastPages
- * @return
*/
public boolean flush(final int flushLeastPages) {
return this.mappedFileQueue.flush(flushLeastPages);
@@ -400,8 +384,6 @@ public class ConsumeQueueExt {
* <p>
* Be careful: it's an address just when invoking this method.
* </p>
- *
- * @return
*/
public long getMaxAddress() {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
@@ -413,8 +395,6 @@ public class ConsumeQueueExt {
/**
* Minus address saved in file.
- *
- * @return
*/
public long getMinAddress() {
MappedFile firstFile = this.mappedFileQueue.getFirstMappedFile();
@@ -435,7 +415,8 @@ public class ConsumeQueueExt {
public static final int MAX_EXT_UNIT_SIZE = Short.MAX_VALUE;
- public CqExtUnit() {}
+ public CqExtUnit() {
+ }
public CqExtUnit(Long tagsCode, long msgStoreTime, byte[] filterBitMap) {
this.tagsCode = tagsCode == null ? 0 : tagsCode;
@@ -468,9 +449,6 @@ public class ConsumeQueueExt {
/**
* build unit from buffer from current position.
- *
- * @param buffer
- * @return
*/
private boolean read(final ByteBuffer buffer) {
if (buffer.position() + 2 > buffer.limit()) {
@@ -507,8 +485,6 @@ public class ConsumeQueueExt {
* <p>
* if size <= 0, nothing to do.
* </p>
- *
- * @param buffer
*/
private void readBySkip(final ByteBuffer buffer) {
ByteBuffer temp = buffer.slice();
@@ -527,9 +503,6 @@ public class ConsumeQueueExt {
* <li>1. @{code container} can be null, it will be created if null.</li>
* <li>2. if capacity of @{code container} is less than unit size, it will be created also.</li>
* <li>3. Pls be sure that size of unit is not greater than {@link #MAX_EXT_UNIT_SIZE}</li>
- *
- * @param container
- * @return
*/
private byte[] write(final ByteBuffer container) {
this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
@@ -557,8 +530,6 @@ public class ConsumeQueueExt {
/**
* Calculate unit size by current data.
- *
- * @return
*/
private int calcUnitSize() {
int sizeTemp = MIN_EXT_UNIT_SIZE + (filterBitMap == null ? 0 : filterBitMap.length);
@@ -600,16 +571,23 @@ public class ConsumeQueueExt {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof CqExtUnit)) return false;
+ if (this == o)
+ return true;
+ if (!(o instanceof CqExtUnit))
+ return false;
CqExtUnit cqExtUnit = (CqExtUnit) o;
- if (bitMapSize != cqExtUnit.bitMapSize) return false;
- if (msgStoreTime != cqExtUnit.msgStoreTime) return false;
- if (size != cqExtUnit.size) return false;
- if (tagsCode != cqExtUnit.tagsCode) return false;
- if (!Arrays.equals(filterBitMap, cqExtUnit.filterBitMap)) return false;
+ if (bitMapSize != cqExtUnit.bitMapSize)
+ return false;
+ if (msgStoreTime != cqExtUnit.msgStoreTime)
+ return false;
+ if (size != cqExtUnit.size)
+ return false;
+ if (tagsCode != cqExtUnit.tagsCode)
+ return false;
+ if (!Arrays.equals(filterBitMap, cqExtUnit.filterBitMap))
+ return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 36c15d4..95a017a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -218,7 +218,6 @@ public class DefaultMessageStore implements MessageStore {
this.shutdown = false;
}
-
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
@@ -577,7 +576,6 @@ public class DefaultMessageStore implements MessageStore {
return getResult;
}
-
public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
@@ -588,7 +586,6 @@ public class DefaultMessageStore implements MessageStore {
return 0;
}
-
public long getMinOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 81cf0f7..492ac5f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -73,7 +73,8 @@ public class MappedFile extends ReferenceResource {
init(fileName, fileSize);
}
- public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
+ public MappedFile(final String fileName, final int fileSize,
+ final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}
@@ -142,7 +143,8 @@ public class MappedFile extends ReferenceResource {
return TOTAL_MAPPED_VIRTUAL_MEMORY.get();
}
- public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
+ public void init(final String fileName, final int fileSize,
+ final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
@@ -209,7 +211,7 @@ public class MappedFile extends ReferenceResource {
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
- result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt);
+ result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
@@ -217,16 +219,14 @@ public class MappedFile extends ReferenceResource {
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
- log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
+ log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
-
public long getFileFromOffset() {
return this.fileFromOffset;
}
-
public boolean appendMessage(final byte[] data) {
int currentPos = this.wrotePosition.get();
@@ -247,10 +247,8 @@ public class MappedFile extends ReferenceResource {
/**
* Content of data from offset to offset + length will be wrote to file.
*
- * @param data
* @param offset The offset of the subarray to be used.
* @param length The length of the subarray to be used.
- * @return
*/
public boolean appendMessage(final byte[] data, final int offset, final int length) {
int currentPos = this.wrotePosition.get();
@@ -270,7 +268,6 @@ public class MappedFile extends ReferenceResource {
}
/**
- * @param flushLeastPages
* @return The current flushed position
*/
public int flush(final int flushLeastPages) {
@@ -404,7 +401,6 @@ public class MappedFile extends ReferenceResource {
return null;
}
-
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
index dee1bc7..bae7a16 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
@@ -21,5 +21,5 @@ import java.util.Map;
public interface MessageArrivingListener {
void arriving(String topic, int queueId, long logicOffset, long tagsCode,
- long msgStoreTime, byte[] filterBitMap, Map<String, String> properties);
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
index 6b34758..3dd0fee 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
@@ -24,22 +24,20 @@ public interface MessageFilter {
* match by tags code or filter bit map which is calculated when message received
* and stored in consume queue ext.
*
- * @param tagsCode tagsCode
- * @param cqExtUnit extend unit of consume queue
- * @return
+ * @param tagsCode tagsCode
+ * @param cqExtUnit extend unit of consume queue
*/
boolean isMatchedByConsumeQueue(final Long tagsCode,
- final ConsumeQueueExt.CqExtUnit cqExtUnit);
+ final ConsumeQueueExt.CqExtUnit cqExtUnit);
/**
* match by message content which are stored in commit log.
* <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store,
* {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null.
*
- * @param msgBuffer message buffer in commit log, may be null if not invoked in store.
- * @param properties message properties, should decode from buffer if null by yourself.
- * @return
+ * @param msgBuffer message buffer in commit log, may be null if not invoked in store.
+ * @param properties message properties, should decode from buffer if null by yourself.
*/
boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
- final Map<String, String> properties);
+ final Map<String, String> properties);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 55572ce..907dfe2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -29,12 +29,14 @@ public interface MessageStore {
/**
* Load previously stored messages.
+ *
* @return true if success; false otherwise.
*/
boolean load();
/**
* Launch this message store.
+ *
* @throws Exception if there is any error.
*/
void start() throws Exception;
@@ -51,6 +53,7 @@ public interface MessageStore {
/**
* Store a message into store.
+ *
* @param msg Message instance to store
* @return result of store operation.
*/
@@ -58,6 +61,7 @@ public interface MessageStore {
/**
* Store a batch of messages.
+ *
* @param messageExtBatch Message batch.
* @return result of storing batch messages.
*/
@@ -80,6 +84,7 @@ public interface MessageStore {
/**
* Get maximum offset of the topic queue.
+ *
* @param topic Topic name.
* @param queueId Queue ID.
* @return Maximum offset at present.
@@ -88,6 +93,7 @@ public interface MessageStore {
/**
* Get the minimum offset of the topic queue.
+ *
* @param topic Topic name.
* @param queueId Queue ID.
* @return Minimum offset at present.
@@ -96,6 +102,7 @@ public interface MessageStore {
/**
* Get the offset of the message in the commit log, which is also known as physical offset.
+ *
* @param topic Topic of the message to lookup.
* @param queueId Queue ID.
* @param consumeQueueOffset offset of consume queue.
@@ -105,6 +112,7 @@ public interface MessageStore {
/**
* Look up the physical offset of the message whose store timestamp is as specified.
+ *
* @param topic Topic of the message.
* @param queueId Queue ID.
* @param timestamp Timestamp to look up.
@@ -114,6 +122,7 @@ public interface MessageStore {
/**
* Look up the message by given commit log offset.
+ *
* @param commitLogOffset physical offset.
* @return Message whose physical offset is as specified.
*/
@@ -121,6 +130,7 @@ public interface MessageStore {
/**
* Get one message from the specified commit log offset.
+ *
* @param commitLogOffset commit log offset.
* @return wrapped result of the message.
*/
@@ -128,6 +138,7 @@ public interface MessageStore {
/**
* Get one message from the specified commit log offset.
+ *
* @param commitLogOffset commit log offset.
* @param msgSize message size.
* @return wrapped result of the message.
@@ -136,30 +147,35 @@ public interface MessageStore {
/**
* Get the running information of this store.
+ *
* @return message store running info.
*/
String getRunningDataInfo();
/**
* Message store runtime information, which should generally contains various statistical information.
+ *
* @return runtime information of the message store in format of key-value pairs.
*/
HashMap<String, String> getRuntimeInfo();
/**
* Get the maximum commit log offset.
+ *
* @return maximum commit log offset.
*/
long getMaxPhyOffset();
/**
* Get the minimum commit log offset.
+ *
* @return minimum commit log offset.
*/
long getMinPhyOffset();
/**
* Get the store time of the earliest message in the given queue.
+ *
* @param topic Topic of the messages to query.
* @param queueId Queue ID to find.
* @return store time of the earliest message.
@@ -168,12 +184,14 @@ public interface MessageStore {
/**
* Get the store time of the earliest message in this store.
+ *
* @return timestamp of the earliest message in this store.
*/
long getEarliestMessageTime();
/**
* Get the store time of the message specified.
+ *
* @param topic message topic.
* @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
@@ -183,6 +201,7 @@ public interface MessageStore {
/**
* Get the total number of the messages in the specified queue.
+ *
* @param topic Topic
* @param queueId Queue ID.
* @return total number.
@@ -191,6 +210,7 @@ public interface MessageStore {
/**
* Get the raw commit log data starting from the given offset, which should used for replication purpose.
+ *
* @param offset starting offset.
* @return commit log data.
*/
@@ -198,6 +218,7 @@ public interface MessageStore {
/**
* Append data to commit log.
+ *
* @param startOffset starting offset.
* @param data data to append.
* @return true if success; false otherwise.
@@ -211,36 +232,40 @@ public interface MessageStore {
/**
* Query messages by given key.
+ *
* @param topic topic of the message.
* @param key message key.
* @param maxNum maximum number of the messages possible.
* @param begin begin timestamp.
* @param end end timestamp.
- * @return
*/
QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end);
/**
* Update HA master address.
+ *
* @param newAddr new address.
*/
void updateHaMasterAddress(final String newAddr);
/**
* Return how much the slave falls behind.
+ *
* @return number of bytes that slave falls behind.
*/
long slaveFallBehindMuch();
/**
* Return the current timestamp of the store.
+ *
* @return current time in milliseconds since 1970-01-01.
*/
long now();
/**
* Clean unused topics.
+ *
* @param topics all valid topics.
* @return number of the topics deleted.
*/
@@ -253,6 +278,7 @@ public interface MessageStore {
/**
* Check if the given message has been swapped out of the memory.
+ *
* @param topic topic.
* @param queueId queue ID.
* @param consumeOffset consume queue offset.
@@ -262,18 +288,21 @@ public interface MessageStore {
/**
* Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue.
+ *
* @return number of the bytes to dispatch.
*/
long dispatchBehindBytes();
/**
* Flush the message store to persist all data.
+ *
* @return maximum offset flushed to persistent storage device.
*/
long flush();
/**
* Reset written offset.
+ *
* @param phyOffset new offset.
* @return true if success; false otherwise.
*/
@@ -281,42 +310,49 @@ public interface MessageStore {
/**
* Get confirm offset.
+ *
* @return confirm offset.
*/
long getConfirmOffset();
/**
* Set confirm offset.
+ *
* @param phyOffset confirm offset to set.
*/
void setConfirmOffset(long phyOffset);
/**
* Check if the operation system page cache is busy or not.
+ *
* @return true if the OS page cache is busy; false otherwise.
*/
boolean isOSPageCacheBusy();
/**
* Get lock time in milliseconds of the store by far.
+ *
* @return lock time in milliseconds.
*/
long lockTimeMills();
/**
* Check if the transient store pool is deficient.
+ *
* @return true if the transient store pool is running out; false otherwise.
*/
boolean isTransientStorePoolDeficient();
/**
* Get the dispatcher list.
+ *
* @return list of the dispatcher.
*/
LinkedList<CommitLogDispatcher> getDispatcherList();
/**
* Get consume queue of the topic/queue.
+ *
* @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
index a03e41a..758f437 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
@@ -21,5 +21,6 @@ package org.apache.rocketmq.store;
*/
public interface PutMessageLock {
void lock();
+
void unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
index 9198f1c..9aa80d8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.store;
-
import java.util.concurrent.locks.ReentrantLock;
/**
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
index baa809d..39a32cc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
@@ -16,12 +16,10 @@
*/
package org.apache.rocketmq.store;
-
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Spin lock Implementation to put message, suggest using this witb low race conditions
- *
*/
public class PutMessageSpinLock implements PutMessageLock {
//true: Can lock, false : in lock.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 3dcd861..7ff11a2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -28,7 +28,6 @@ public class RunningFlags {
private static final int DISK_FULL_BIT = 1 << 4;
-
private volatile int flagBits = 0;
public RunningFlags() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 19ed211..02aa84a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -143,7 +143,6 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
-
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -605,7 +604,8 @@ public class MessageStoreConfig {
}
/**
- * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is ASYNC_FLUSH
+ * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is
+ * ASYNC_FLUSH
*
* @return <tt>true</tt> or <tt>false</tt>
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index e0c51a1..8b97504 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -78,7 +78,6 @@ public class HAConnection {
return socketChannel;
}
-
class ReadSocketService extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
private final Selector selector;
@@ -191,7 +190,6 @@ public class HAConnection {
}
}
-
class WriteSocketService extends ServiceThread {
private final Selector selector;
private final SocketChannel socketChannel;
@@ -327,7 +325,6 @@ public class HAConnection {
HAConnection.log.info(this.getServiceName() + " service end");
}
-
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header