You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:50 UTC
[25/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
index cd66699..0830842 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.listener;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
index 3c1ef3d..405781b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.listener;
import org.apache.rocketmq.common.message.MessageQueue;
-
/**
* Consumer Orderly consumption context
*
@@ -28,32 +27,26 @@ public class ConsumeOrderlyContext {
private boolean autoCommit = true;
private long suspendCurrentQueueTimeMillis = -1;
-
public ConsumeOrderlyContext(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
-
public boolean isAutoCommit() {
return autoCommit;
}
-
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
-
public MessageQueue getMessageQueue() {
return messageQueue;
}
-
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
-
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
index 2e55d89..0c6c6e6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.listener;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
index 99083b4..5de4ded 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.listener;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
index 5d05452..2a9e5c9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.listener;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
index 1c59ef7..c083157 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
@@ -6,20 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.listener;
-import org.apache.rocketmq.common.message.MessageExt;
-
import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageExt;
/**
* A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently
@@ -37,5 +35,5 @@ public interface MessageListenerConcurrently extends MessageListener {
* @return The consume status
*/
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
- final ConsumeConcurrentlyContext context);
+ final ConsumeConcurrentlyContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
index 5de976f..57a553a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java
@@ -6,20 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.listener;
-import org.apache.rocketmq.common.message.MessageExt;
-
import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageExt;
/**
* A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread
@@ -37,5 +35,5 @@ public interface MessageListenerOrderly extends MessageListener {
* @return The consume status
*/
ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
- final ConsumeOrderlyContext context);
+ final ConsumeOrderlyContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
index 218f659..256c639 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
@@ -16,25 +16,22 @@
*/
package org.apache.rocketmq.client.consumer.rebalance;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
-import java.util.ArrayList;
-import java.util.List;
-
-
/**
* Average Hashing queue algorithm
- *
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
+ List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
@@ -48,17 +45,17 @@ public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrate
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ consumerGroup,
+ currentCID,
+ cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
- mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
- + 1 : mqAll.size() / cidAll.size());
+ mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
index d612d4f..5df5cd2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -16,25 +16,22 @@
*/
package org.apache.rocketmq.client.consumer.rebalance;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
-import java.util.ArrayList;
-import java.util.List;
-
-
/**
* Cycle average Hashing queue algorithm
- *
*/
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
+ List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
@@ -48,9 +45,9 @@ public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQue
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ consumerGroup,
+ currentCID,
+ cidAll);
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
index c8fe2d1..387822d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
@@ -6,28 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
+import java.util.List;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
-import java.util.List;
-
-
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
private List<MessageQueue> messageQueueList;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
+ List<String> cidAll) {
return this.messageQueueList;
}
@@ -40,7 +38,6 @@ public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrateg
return messageQueueList;
}
-
public void setMessageQueueList(List<MessageQueue> messageQueueList) {
this.messageQueueList = messageQueueList;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
index adfc124..a154f89 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -6,23 +6,21 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
/**
* Computer room Hashing queue algorithm, such as Alipay logic room
@@ -32,7 +30,7 @@ public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueSt
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
+ List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
@@ -68,7 +66,6 @@ public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueSt
return consumeridcs;
}
-
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index bdaeb58..053ade2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.client.consumer.store;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -27,41 +34,29 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
/**
* Local storage implementation
- *
*/
public class LocalFileOffsetStore implements OffsetStore {
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
- "rocketmq.client.localOffsetStoreDir",
- System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
+ "rocketmq.client.localOffsetStoreDir",
+ System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final static Logger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
-
+ new ConcurrentHashMap<MessageQueue, AtomicLong>();
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + //
- this.mQClientFactory.getClientId() + File.separator + //
- this.groupName + File.separator + //
- "offsets.json";
+ this.mQClientFactory.getClientId() + File.separator + //
+ this.groupName + File.separator + //
+ "offsets.json";
}
-
@Override
public void load() throws MQClientException {
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
@@ -71,14 +66,13 @@ public class LocalFileOffsetStore implements OffsetStore {
for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
log.info("load consumer's offset, {} {} {}",
- this.groupName,
- mq,
- offset.get());
+ this.groupName,
+ mq,
+ offset.get());
}
}
}
-
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
@@ -97,7 +91,6 @@ public class LocalFileOffsetStore implements OffsetStore {
}
}
-
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
@@ -134,7 +127,6 @@ public class LocalFileOffsetStore implements OffsetStore {
return -1;
}
-
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
@@ -158,7 +150,6 @@ public class LocalFileOffsetStore implements OffsetStore {
}
}
-
@Override
public void persist(MessageQueue mq) {
}
@@ -170,7 +161,7 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
}
@@ -196,7 +187,7 @@ public class LocalFileOffsetStore implements OffsetStore {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper =
- OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
+ OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
@@ -212,12 +203,12 @@ public class LocalFileOffsetStore implements OffsetStore {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper =
- OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
+ OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception", e);
throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" //
- + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
- e);
+ + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
+ e);
}
return offsetSerializeWrapper;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
index a9fadf2..4954f6f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
@@ -6,22 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.store;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
/**
* Wrapper class for offset serialization
@@ -29,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class OffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
+ new ConcurrentHashMap<MessageQueue, AtomicLong>();
public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
return offsetTable;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
index 334f0a1..592796f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.store;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.util.Map;
-import java.util.Set;
-
-
/**
* Offset store interface
*
@@ -37,7 +35,6 @@ public interface OffsetStore {
*/
void load() throws MQClientException;
-
/**
* Update the offset,store it in memory
*
@@ -91,6 +88,6 @@ public interface OffsetStore {
* @param offset
* @param isOneway
*/
- void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException;
+ void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
index c2ee9b7..da16765 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer.store;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 4adc18c..32ef877 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.client.consumer.store;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -29,37 +35,25 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHea
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
/**
* Remote storage implementation
- *
*/
public class RemoteBrokerOffsetStore implements OffsetStore {
private final static Logger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String groupName;
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
-
+ new ConcurrentHashMap<MessageQueue, AtomicLong>();
public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
}
-
@Override
public void load() {
}
-
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
@@ -78,7 +72,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
}
-
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
@@ -117,7 +110,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return -1;
}
-
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
@@ -133,10 +125,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
- this.groupName,
- this.mQClientFactory.getClientId(),
- mq,
- offset.get());
+ this.groupName,
+ this.mQClientFactory.getClientId(),
+ mq,
+ offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
@@ -155,7 +147,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
}
-
@Override
public void persist(MessageQueue mq) {
AtomicLong offset = this.offsetTable.get(mq);
@@ -163,10 +154,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
- this.groupName,
- this.mQClientFactory.getClientId(),
- mq,
- offset.get());
+ this.groupName,
+ this.mQClientFactory.getClientId(),
+ mq,
+ offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
@@ -177,7 +168,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
if (mq != null) {
this.offsetTable.remove(mq);
log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
- offsetTable.size());
+ offsetTable.size());
}
}
@@ -199,7 +190,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
* here need to be optimized.
*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException {
+ MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}
@@ -209,7 +200,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException {
+ MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
// TODO Here may be heavily overhead for Name Server,need tuning
@@ -226,10 +217,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
- findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+ findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
- findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+ findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
@@ -237,7 +228,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException {
+ InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
// TODO Here may be heavily overhead for Name Server,need tuning
@@ -252,7 +243,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setQueueId(mq.getQueueId());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
- findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+ findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
index ce4bedb..7515a30 100644
--- a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
@@ -6,39 +6,35 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
-
public class MQBrokerException extends Exception {
private static final long serialVersionUID = 5975020272601250368L;
private final int responseCode;
private final String errorMessage;
-
public MQBrokerException(int responseCode, String errorMessage) {
super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
- + errorMessage));
+ + errorMessage));
this.responseCode = responseCode;
this.errorMessage = errorMessage;
}
-
public int getResponseCode() {
return responseCode;
}
-
public String getErrorMessage() {
return errorMessage;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
index 7ffab0d..41a2b59 100644
--- a/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
@@ -6,36 +6,33 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.exception;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
-
public class MQClientException extends Exception {
private static final long serialVersionUID = -5758410930844185841L;
private int responseCode;
private String errorMessage;
-
public MQClientException(String errorMessage, Throwable cause) {
super(FAQUrl.attachDefaultURL(errorMessage), cause);
this.responseCode = -1;
this.errorMessage = errorMessage;
}
-
public MQClientException(int responseCode, String errorMessage) {
super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
- + errorMessage));
+ + errorMessage));
this.responseCode = responseCode;
this.errorMessage = errorMessage;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
index e84beff..cf6c157 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
@@ -21,7 +21,6 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
-
public class CheckForbiddenContext {
private String nameSrvAddr;
private String group;
@@ -34,112 +33,91 @@ public class CheckForbiddenContext {
private Object arg;
private boolean unitMode = false;
-
public String getGroup() {
return group;
}
-
public void setGroup(String group) {
this.group = group;
}
-
public Message getMessage() {
return message;
}
-
public void setMessage(Message message) {
this.message = message;
}
-
public MessageQueue getMq() {
return mq;
}
-
public void setMq(MessageQueue mq) {
this.mq = mq;
}
-
public String getBrokerAddr() {
return brokerAddr;
}
-
public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}
-
public CommunicationMode getCommunicationMode() {
return communicationMode;
}
-
public void setCommunicationMode(CommunicationMode communicationMode) {
this.communicationMode = communicationMode;
}
-
public SendResult getSendResult() {
return sendResult;
}
-
public void setSendResult(SendResult sendResult) {
this.sendResult = sendResult;
}
-
public Exception getException() {
return exception;
}
-
public void setException(Exception exception) {
this.exception = exception;
}
-
public Object getArg() {
return arg;
}
-
public void setArg(Object arg) {
this.arg = arg;
}
-
public boolean isUnitMode() {
return unitMode;
}
-
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
-
public String getNameSrvAddr() {
return nameSrvAddr;
}
-
public void setNameSrvAddr(String nameSrvAddr) {
this.nameSrvAddr = nameSrvAddr;
}
-
@Override
public String toString() {
return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message
- + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode
- + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode
- + ", arg=" + arg + "]";
+ + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode
+ + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode
+ + ", arg=" + arg + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
index d6f75bb..7faf14b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
@@ -6,23 +6,21 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
import org.apache.rocketmq.client.exception.MQClientException;
-
public interface CheckForbiddenHook {
String hookName();
-
void checkForbidden(final CheckForbiddenContext context) throws MQClientException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
index f141fac..5bababa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
@@ -6,22 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.List;
import java.util.Map;
-
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
public class ConsumeMessageContext {
private String consumerGroup;
@@ -32,72 +30,58 @@ public class ConsumeMessageContext {
private Object mqTraceContext;
private Map<String, String> props;
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public List<MessageExt> getMsgList() {
return msgList;
}
-
public void setMsgList(List<MessageExt> msgList) {
this.msgList = msgList;
}
-
public MessageQueue getMq() {
return mq;
}
-
public void setMq(MessageQueue mq) {
this.mq = mq;
}
-
public boolean isSuccess() {
return success;
}
-
public void setSuccess(boolean success) {
this.success = success;
}
-
public Object getMqTraceContext() {
return mqTraceContext;
}
-
public void setMqTraceContext(Object mqTraceContext) {
this.mqTraceContext = mqTraceContext;
}
-
public Map<String, String> getProps() {
return props;
}
-
public void setProps(Map<String, String> props) {
this.props = props;
}
-
public String getStatus() {
return status;
}
-
public void setStatus(String status) {
this.status = status;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
index 8161d2e..95db7b2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
index 23340d3..bc22546 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
@@ -6,22 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
+import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import java.util.List;
-
-
public class FilterMessageContext {
private String consumerGroup;
private List<MessageExt> msgList;
@@ -29,60 +27,49 @@ public class FilterMessageContext {
private Object arg;
private boolean unitMode;
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public List<MessageExt> getMsgList() {
return msgList;
}
-
public void setMsgList(List<MessageExt> msgList) {
this.msgList = msgList;
}
-
public MessageQueue getMq() {
return mq;
}
-
public void setMq(MessageQueue mq) {
this.mq = mq;
}
-
public Object getArg() {
return arg;
}
-
public void setArg(Object arg) {
this.arg = arg;
}
-
public boolean isUnitMode() {
return unitMode;
}
-
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
-
@Override
public String toString() {
return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq="
- + mq + ", arg=" + arg + "]";
+ + mq + ", arg=" + arg + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
index 48fd513..095de32 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
public interface FilterMessageHook {
String hookName();
-
void filterMessage(final FilterMessageContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
index bfb4a47..34e22a3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
@@ -6,16 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
+import java.util.Map;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.SendResult;
@@ -23,9 +24,6 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
-import java.util.Map;
-
-
public class SendMessageContext {
private String producerGroup;
private Message message;
@@ -60,97 +58,78 @@ public class SendMessageContext {
return producerGroup;
}
-
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
-
public Message getMessage() {
return message;
}
-
public void setMessage(Message message) {
this.message = message;
}
-
public MessageQueue getMq() {
return mq;
}
-
public void setMq(MessageQueue mq) {
this.mq = mq;
}
-
public String getBrokerAddr() {
return brokerAddr;
}
-
public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}
-
public CommunicationMode getCommunicationMode() {
return communicationMode;
}
-
public void setCommunicationMode(CommunicationMode communicationMode) {
this.communicationMode = communicationMode;
}
-
public SendResult getSendResult() {
return sendResult;
}
-
public void setSendResult(SendResult sendResult) {
this.sendResult = sendResult;
}
-
public Exception getException() {
return exception;
}
-
public void setException(Exception exception) {
this.exception = exception;
}
-
public Object getMqTraceContext() {
return mqTraceContext;
}
-
public void setMqTraceContext(Object mqTraceContext) {
this.mqTraceContext = mqTraceContext;
}
-
public Map<String, String> getProps() {
return props;
}
-
public void setProps(Map<String, String> props) {
this.props = props;
}
-
public String getBornHost() {
return bornHost;
}
-
public void setBornHost(String bornHost) {
this.bornHost = bornHost;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
index c040831..16a86a0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.hook;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index bb008bf..46ce08c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -6,16 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl;
+import io.netty.channel.ChannelHandlerContext;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
@@ -30,29 +34,26 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.common.protocol.header.*;
import org.slf4j.Logger;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final Logger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
-
public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
-
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
@@ -83,7 +84,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
- (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
+ (CheckTransactionStateRequestHeader)request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
@@ -109,10 +110,10 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
try {
final NotifyConsumerIdsChangedRequestHeader requestHeader =
- (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
+ (NotifyConsumerIdsChangedRequestHeader)request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.getConsumerGroup());
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.getConsumerGroup());
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
@@ -122,10 +123,11 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
- (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
+ (ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
- new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
- requestHeader.getTimestamp()});
+ new Object[] {
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
+ requestHeader.getTimestamp()});
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
if (request.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
@@ -139,7 +141,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerStatusRequestHeader requestHeader =
- (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
+ (GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());
GetConsumerStatusBody body = new GetConsumerStatusBody();
@@ -152,7 +154,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerRunningInfoRequestHeader requestHeader =
- (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+ (GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
if (null != consumerRunningInfo) {
@@ -175,13 +177,13 @@ public class ClientRemotingProcessor implements NettyRequestProcessor {
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
- (ConsumeMessageDirectlyResultRequestHeader) request
- .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
+ (ConsumeMessageDirectlyResultRequestHeader)request
+ .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
ConsumeMessageDirectlyResult result =
- this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
+ this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
if (null != result) {
response.setCode(ResponseCode.SUCCESS);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
index 9af6794..07b3a36 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
index 8773f26..c6405d8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl;
@@ -20,18 +20,15 @@ public class FindBrokerResult {
private final String brokerAddr;
private final boolean slave;
-
public FindBrokerResult(String brokerAddr, boolean slave) {
this.brokerAddr = brokerAddr;
this.slave = slave;
}
-
public String getBrokerAddr() {
return brokerAddr;
}
-
public boolean isSlave() {
return slave;
}