You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:15:01 UTC
[70/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 84a3af6..82fe8f4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -16,19 +16,15 @@
*/
package org.apache.rocketmq.store.ha;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.CommitLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.*;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -36,7 +32,13 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.CommitLog;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HAService {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -56,38 +58,33 @@ public class HAService {
private final HAClient haClient;
-
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
- new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+ new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}
-
public void updateMasterAddress(final String newAddr) {
if (this.haClient != null) {
this.haClient.updateMasterAddress(newAddr);
}
}
-
public void putRequest(final CommitLog.GroupCommitRequest request) {
this.groupTransferService.putRequest(request);
}
-
public boolean isSlaveOK(final long masterPutWhere) {
boolean result = this.connectionCount.get() > 0;
result =
- result
- && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
- .getMessageStoreConfig().getHaSlaveFallbehindMax());
+ result
+ && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
+ .getMessageStoreConfig().getHaSlaveFallbehindMax());
return result;
}
-
/**
*/
@@ -103,12 +100,10 @@ public class HAService {
}
}
-
public AtomicInteger getConnectionCount() {
return connectionCount;
}
-
// public void notifyTransferSome() {
// this.groupTransferService.notifyTransferSome();
// }
@@ -120,21 +115,18 @@ public class HAService {
this.haClient.start();
}
-
public void addConnection(final HAConnection conn) {
synchronized (this.connectionList) {
this.connectionList.add(conn);
}
}
-
public void removeConnection(final HAConnection conn) {
synchronized (this.connectionList) {
this.connectionList.remove(conn);
}
}
-
public void shutdown() {
this.haClient.shutdown();
this.acceptSocketService.shutdown(true);
@@ -142,7 +134,6 @@ public class HAService {
this.groupTransferService.shutdown();
}
-
public void destroyConnections() {
synchronized (this.connectionList) {
for (HAConnection c : this.connectionList) {
@@ -153,12 +144,10 @@ public class HAService {
}
}
-
public DefaultMessageStore getDefaultMessageStore() {
return defaultMessageStore;
}
-
public WaitNotifyObject getWaitNotifyObject() {
return waitNotifyObject;
}
@@ -171,9 +160,9 @@ public class HAService {
* Listens to slave connections to create {@link HAConnection}.
*/
class AcceptSocketService extends ServiceThread {
+ private final SocketAddress socketAddressListen;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
- private final SocketAddress socketAddressListen;
public AcceptSocketService(final int port) {
this.socketAddressListen = new InetSocketAddress(port);
@@ -181,6 +170,7 @@ public class HAService {
/**
* Starts listening to slave connections.
+ *
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
@@ -199,8 +189,7 @@ public class HAService {
try {
this.serverSocketChannel.close();
this.selector.close();
- }
- catch (IOException e) {
+ } catch (IOException e) {
log.error("AcceptSocketService shutdown exception", e);
}
}
@@ -218,11 +207,11 @@ public class HAService {
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
- SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
+ SocketChannel sc = ((ServerSocketChannel)k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
- + sc.socket().getRemoteSocketAddress());
+ + sc.socket().getRemoteSocketAddress());
try {
HAConnection conn = new HAConnection(HAService.this, sc);
@@ -264,7 +253,6 @@ public class HAService {
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
-
public void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this) {
this.requestsWrite.add(request);
@@ -274,19 +262,16 @@ public class HAService {
}
}
-
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
-
private void swapRequests() {
List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
-
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
@@ -307,7 +292,6 @@ public class HAService {
}
}
-
public void run() {
log.info(this.getServiceName() + " service started");
@@ -323,13 +307,11 @@ public class HAService {
log.info(this.getServiceName() + " service end");
}
-
@Override
protected void onWaitEnd() {
this.swapRequests();
}
-
@Override
public String getServiceName() {
return GroupTransferService.class.getSimpleName();
@@ -349,12 +331,10 @@ public class HAService {
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
-
public HAClient() throws IOException {
this.selector = RemotingUtil.openSelector();
}
-
public void updateMasterAddress(final String newAddr) {
String currentAddr = this.masterAddress.get();
if (currentAddr == null || !currentAddr.equals(newAddr)) {
@@ -363,17 +343,15 @@ public class HAService {
}
}
-
private boolean isTimeToReportOffset() {
long interval =
- HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
+ HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
- .getHaSendHeartbeatInterval();
+ .getHaSendHeartbeatInterval();
return needHeart;
}
-
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
@@ -386,7 +364,7 @@ public class HAService {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
- + "reportSlaveMaxOffset this.socketChannel.write exception", e);
+ + "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
@@ -394,7 +372,6 @@ public class HAService {
return !this.reportOffset.hasRemaining();
}
-
// private void reallocateByteBuffer() {
// ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// int remain = this.byteBufferRead.limit() - this.dispatchPostion;
@@ -423,14 +400,12 @@ public class HAService {
this.dispatchPostion = 0;
}
-
private void swapByteBuffer() {
ByteBuffer tmp = this.byteBufferRead;
this.byteBufferRead = this.byteBufferBackup;
this.byteBufferBackup = tmp;
}
-
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
@@ -462,7 +437,6 @@ public class HAService {
return true;
}
-
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
@@ -475,22 +449,19 @@ public class HAService {
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
-
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
- + slavePhyOffset + " MASTER: " + masterPhyOffset);
+ + slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
-
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
this.byteBufferRead.get(bodyData);
-
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
@@ -514,7 +485,6 @@ public class HAService {
return true;
}
-
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
@@ -530,7 +500,6 @@ public class HAService {
return result;
}
-
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
@@ -553,7 +522,6 @@ public class HAService {
return this.socketChannel != null;
}
-
private void closeMaster() {
if (null != this.socketChannel) {
try {
@@ -581,7 +549,6 @@ public class HAService {
}
}
-
@Override
public void run() {
log.info(this.getServiceName() + " service started");
@@ -597,10 +564,8 @@ public class HAService {
}
}
-
this.selector.select(1000);
-
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
@@ -610,14 +575,13 @@ public class HAService {
continue;
}
-
long interval =
- HAService.this.getDefaultMessageStore().getSystemClock().now()
- - this.lastWriteTimestamp;
+ HAService.this.getDefaultMessageStore().getSystemClock().now()
+ - this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
- .getHaHousekeepingInterval()) {
+ .getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
- + "] expired, " + interval);
+ + "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
@@ -633,7 +597,6 @@ public class HAService {
log.info(this.getServiceName() + " service end");
}
-
//
// private void disableWriteFlag() {
// if (this.socketChannel != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
index c059e10..a96af5e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
@@ -6,27 +6,25 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.ha;
import java.util.HashMap;
-
public class WaitNotifyObject {
protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable =
- new HashMap<Long, Boolean>(16);
+ new HashMap<Long, Boolean>(16);
protected volatile boolean hasNotified = false;
-
public void wakeup() {
synchronized (this) {
if (!this.hasNotified) {
@@ -36,7 +34,6 @@ public class WaitNotifyObject {
}
}
-
protected void waitForRunning(long interval) {
synchronized (this) {
if (this.hasNotified) {
@@ -56,7 +53,6 @@ public class WaitNotifyObject {
}
}
-
protected void onWaitEnd() {
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
index d6a223d..de1a31d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
@@ -6,28 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.index;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.store.MappedFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.List;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.MappedFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class IndexFile {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -41,11 +39,10 @@ public class IndexFile {
private final MappedByteBuffer mappedByteBuffer;
private final IndexHeader indexHeader;
-
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
- final long endPhyOffset, final long endTimestamp) throws IOException {
+ final long endPhyOffset, final long endTimestamp) throws IOException {
int fileTotalSize =
- IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
+ IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
this.mappedFile = new MappedFile(fileName, fileTotalSize);
this.fileChannel = this.mappedFile.getFileChannel();
this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
@@ -66,17 +63,14 @@ public class IndexFile {
}
}
-
public String getFileName() {
return this.mappedFile.getFileName();
}
-
public void load() {
this.indexHeader.load();
}
-
public void flush() {
long beginTime = System.currentTimeMillis();
if (this.mappedFile.hold()) {
@@ -114,10 +108,8 @@ public class IndexFile {
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
-
timeDiff = timeDiff / 1000;
-
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
@@ -127,19 +119,16 @@ public class IndexFile {
}
int absIndexPos =
- IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
- + this.indexHeader.getIndexCount() * indexSize;
-
+ IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ + this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
- this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
+ this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int)timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
-
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
-
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
@@ -192,22 +181,22 @@ public class IndexFile {
public boolean isTimeMatched(final long begin, final long end) {
boolean result =
- begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
+ begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
result =
- result
- || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader
- .getEndTimestamp());
+ result
+ || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader
+ .getEndTimestamp());
result =
- result
- || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader
- .getEndTimestamp());
+ result
+ || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader
+ .getEndTimestamp());
return result;
}
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
- final long begin, final long end, boolean lock) {
+ final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
@@ -227,25 +216,24 @@ public class IndexFile {
// }
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
- || this.indexHeader.getIndexCount() <= 1) {
+ || this.indexHeader.getIndexCount() <= 1) {
// TODO NOTFOUND
} else {
- for (int nextIndexToRead = slotValue;;) {
+ for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
- IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
- + nextIndexToRead * indexSize;
+ IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ + nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
- long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
+ long timeDiff = (long)this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
-
if (timeDiff < 0) {
break;
}
@@ -260,8 +248,8 @@ public class IndexFile {
}
if (prevIndexRead <= invalidIndex
- || prevIndexRead > this.indexHeader.getIndexCount()
- || prevIndexRead == nextIndexToRead || timeRead < begin) {
+ || prevIndexRead > this.indexHeader.getIndexCount()
+ || prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
index 130f08e..0c00abd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
/**
*
@@ -43,12 +42,10 @@ public class IndexHeader {
private AtomicInteger indexCount = new AtomicInteger(1);
-
public IndexHeader(final ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
}
-
public void load() {
this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
@@ -72,67 +69,55 @@ public class IndexHeader {
this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
}
-
public long getBeginTimestamp() {
return beginTimestamp.get();
}
-
public void setBeginTimestamp(long beginTimestamp) {
this.beginTimestamp.set(beginTimestamp);
this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp);
}
-
public long getEndTimestamp() {
return endTimestamp.get();
}
-
public void setEndTimestamp(long endTimestamp) {
this.endTimestamp.set(endTimestamp);
this.byteBuffer.putLong(endTimestampIndex, endTimestamp);
}
-
public long getBeginPhyOffset() {
return beginPhyOffset.get();
}
-
public void setBeginPhyOffset(long beginPhyOffset) {
this.beginPhyOffset.set(beginPhyOffset);
this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset);
}
-
public long getEndPhyOffset() {
return endPhyOffset.get();
}
-
public void setEndPhyOffset(long endPhyOffset) {
this.endPhyOffset.set(endPhyOffset);
this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset);
}
-
public AtomicInteger getHashSlotCount() {
return hashSlotCount;
}
-
public void incHashSlotCount() {
int value = this.hashSlotCount.incrementAndGet();
this.byteBuffer.putInt(hashSlotcountIndex, value);
}
-
public int getIndexCount() {
return indexCount.get();
}
-
public void incIndexCount() {
int value = this.indexCount.incrementAndGet();
this.byteBuffer.putInt(indexCountIndex, value);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
index 939ba2d..b72ffe9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.store.index;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
@@ -26,39 +33,25 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-
public class IndexService {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ /** Maximum times to attempt index file creation. */
+ private static final int MAX_TRY_IDX_CREATE = 3;
private final DefaultMessageStore defaultMessageStore;
-
private final int hashSlotNum;
private final int indexNum;
private final String storePath;
-
private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- /** Maximum times to attempt index file creation. */
- private static final int MAX_TRY_IDX_CREATE = 3;
-
-
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
- StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
+ StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
-
public boolean load(final boolean lastExitOK) {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
@@ -72,7 +65,7 @@ public class IndexService {
if (!lastExitOK) {
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
- .getIndexMsgTimestamp()) {
+ .getIndexMsgTimestamp()) {
f.destroy(0);
continue;
}
@@ -113,7 +106,7 @@ public class IndexService {
if (files != null) {
List<IndexFile> fileList = new ArrayList<IndexFile>();
for (int i = 0; i < (files.length - 1); i++) {
- IndexFile f = (IndexFile) files[i];
+ IndexFile f = (IndexFile)files[i];
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
@@ -145,7 +138,6 @@ public class IndexService {
}
}
-
public void destroy() {
try {
this.readWriteLock.readLock().lock();
@@ -160,7 +152,6 @@ public class IndexService {
}
}
-
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
List<Long> phyOffsets = new ArrayList<Long>(maxNum);
@@ -183,7 +174,6 @@ public class IndexService {
f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
}
-
if (f.getBeginTimestamp() < begin) {
break;
}
@@ -202,12 +192,10 @@ public class IndexService {
return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
}
-
private String buildKey(final String topic, final String key) {
return topic + "#" + key;
}
-
public void buildIndex(DispatchRequest req) {
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
@@ -255,7 +243,6 @@ public class IndexService {
}
}
-
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
@@ -300,7 +287,6 @@ public class IndexService {
return indexFile;
}
-
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
@@ -323,15 +309,14 @@ public class IndexService {
this.readWriteLock.readLock().unlock();
}
-
if (indexFile == null) {
try {
String fileName =
- this.storePath + File.separator
- + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
+ this.storePath + File.separator
+ + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
indexFile =
- new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
- lastUpdateIndexTimestamp);
+ new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
+ lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) {
@@ -340,7 +325,6 @@ public class IndexService {
this.readWriteLock.writeLock().unlock();
}
-
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@@ -358,7 +342,6 @@ public class IndexService {
return indexFile;
}
-
public void flush(final IndexFile f) {
if (null == f)
return;
@@ -377,12 +360,10 @@ public class IndexService {
}
}
-
public void start() {
}
-
public void shutdown() {
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java b/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
index e126aee..a864c89 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java
@@ -6,43 +6,38 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.index;
import java.util.List;
-
public class QueryOffsetResult {
private final List<Long> phyOffsets;
private final long indexLastUpdateTimestamp;
private final long indexLastUpdatePhyoffset;
-
public QueryOffsetResult(List<Long> phyOffsets, long indexLastUpdateTimestamp,
- long indexLastUpdatePhyoffset) {
+ long indexLastUpdatePhyoffset) {
this.phyOffsets = phyOffsets;
this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
}
-
public List<Long> getPhyOffsets() {
return phyOffsets;
}
-
public long getIndexLastUpdateTimestamp() {
return indexLastUpdateTimestamp;
}
-
public long getIndexLastUpdatePhyoffset() {
return indexLastUpdatePhyoffset;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
index b1520e1..ebe3ffe 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
@@ -6,34 +6,30 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.schedule;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
/**
*
*/
public class DelayOffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
- new ConcurrentHashMap<Integer, Long>(32);
-
+ new ConcurrentHashMap<Integer, Long>(32);
public ConcurrentHashMap<Integer, Long> getOffsetTable() {
return offsetTable;
}
-
public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) {
this.offsetTable = offsetTable;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index c03c181..3df4806 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -6,16 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.schedule;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -24,16 +31,16 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.running.RunningStats;
-import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
/**
*
*/
@@ -45,10 +52,10 @@ public class ScheduleMessageService extends ConfigManager {
private static final long DELAY_FOR_A_PERIOD = 10000L;
private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
- new ConcurrentHashMap<Integer, Long>(32);
+ new ConcurrentHashMap<Integer, Long>(32);
private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
- new ConcurrentHashMap<Integer, Long>(32);
+ new ConcurrentHashMap<Integer, Long>(32);
private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
@@ -56,7 +63,6 @@ public class ScheduleMessageService extends ConfigManager {
private int maxDelayLevel;
-
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
}
@@ -65,6 +71,10 @@ public class ScheduleMessageService extends ConfigManager {
return queueId + 1;
}
+ public static int delayLevel2QueueId(final int delayLevel) {
+ return delayLevel - 1;
+ }
+
public void buildRunningStats(HashMap<String, String> stats) {
Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
@@ -78,16 +88,10 @@ public class ScheduleMessageService extends ConfigManager {
}
}
- public static int delayLevel2QueueId(final int delayLevel) {
- return delayLevel - 1;
- }
-
-
private void updateOffset(int delayLevel, long offset) {
this.offsetTable.put(delayLevel, offset);
}
-
public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
Long time = this.delayLevelTable.get(delayLevel);
if (time != null) {
@@ -97,7 +101,6 @@ public class ScheduleMessageService extends ConfigManager {
return storeTimestamp + 1000;
}
-
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
@@ -126,17 +129,14 @@ public class ScheduleMessageService extends ConfigManager {
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
-
public void shutdown() {
this.timer.cancel();
}
-
public int getMaxDelayLevel() {
return maxDelayLevel;
}
-
public String encode() {
return this.encode(false);
}
@@ -150,14 +150,14 @@ public class ScheduleMessageService extends ConfigManager {
@Override
public String configFilePath() {
return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
- .getStorePathRootDir());
+ .getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
- DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
+ DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
if (delayOffsetSerializeWrapper != null) {
this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
}
@@ -206,13 +206,11 @@ public class ScheduleMessageService extends ConfigManager {
private final int delayLevel;
private final long offset;
-
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
-
@Override
public void run() {
try {
@@ -221,11 +219,10 @@ public class ScheduleMessageService extends ConfigManager {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
- this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
+ this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
-
/**
*
@@ -243,11 +240,10 @@ public class ScheduleMessageService extends ConfigManager {
return result;
}
-
public void executeOnTimeup() {
ConsumeQueue cq =
- ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
- delayLevel2QueueId(delayLevel));
+ ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
+ delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
@@ -262,7 +258,6 @@ public class ScheduleMessageService extends ConfigManager {
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
-
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
@@ -272,31 +267,29 @@ public class ScheduleMessageService extends ConfigManager {
if (countdown <= 0) {
MessageExt msgExt =
- ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
- offsetPy, sizePy);
+ ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
+ offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
- ScheduleMessageService.this.defaultMessageStore
- .putMessage(msgInner);
+ ScheduleMessageService.this.defaultMessageStore
+ .putMessage(msgInner);
if (putMessageResult != null
- && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+ && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
- }
-
- else {
+ } else {
// XXX: warn and notify me
log.error(
- "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
- msgExt.getTopic(), msgExt.getMsgId());
+ "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
+ msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
- new DeliverDelayedMessageTimerTask(this.delayLevel,
- nextOffset), DELAY_FOR_A_PERIOD);
+ new DeliverDelayedMessageTimerTask(this.delayLevel,
+ nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
- nextOffset);
+ nextOffset);
return;
}
} catch (Exception e) {
@@ -307,17 +300,15 @@ public class ScheduleMessageService extends ConfigManager {
*/
log.error(
- "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
- + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
- + offsetPy + ",sizePy=" + sizePy, e);
+ "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ + offsetPy + ",sizePy=" + sizePy, e);
}
}
- }
-
- else {
+ } else {
ScheduleMessageService.this.timer.schedule(
- new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
- countdown);
+ new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
+ countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
@@ -325,7 +316,7 @@ public class ScheduleMessageService extends ConfigManager {
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
- this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
+ this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
@@ -342,16 +333,15 @@ public class ScheduleMessageService extends ConfigManager {
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
- + cqMinOffset + ", queueId=" + cq.getQueueId());
+ + cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
- failScheduleOffset), DELAY_FOR_A_WHILE);
+ failScheduleOffset), DELAY_FOR_A_WHILE);
}
-
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setBody(msgExt.getBody());
@@ -360,7 +350,7 @@ public class ScheduleMessageService extends ConfigManager {
TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
- MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
+ MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
index dd4f6df..cd87b0d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.stats;
@@ -21,7 +21,6 @@ import org.apache.rocketmq.store.DefaultMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class BrokerStats {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final DefaultMessageStore defaultMessageStore;
@@ -34,12 +33,10 @@ public class BrokerStats {
private volatile long msgGetTotalTodayMorning;
-
public BrokerStats(DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
}
-
/**
*/
@@ -48,60 +45,50 @@ public class BrokerStats {
this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
this.msgPutTotalTodayMorning =
- this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+ this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
this.msgGetTotalTodayMorning =
- this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
+ this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
}
-
public long getMsgPutTotalYesterdayMorning() {
return msgPutTotalYesterdayMorning;
}
-
public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
}
-
public long getMsgPutTotalTodayMorning() {
return msgPutTotalTodayMorning;
}
-
public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
}
-
public long getMsgGetTotalYesterdayMorning() {
return msgGetTotalYesterdayMorning;
}
-
public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
}
-
public long getMsgGetTotalTodayMorning() {
return msgGetTotalTodayMorning;
}
-
public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
}
-
public long getMsgPutTotalTodayNow() {
return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
}
-
public long getMsgGetTotalTodayNow() {
return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index b14780b..f128b09 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -6,16 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.stats;
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.stats.MomentStatsItemSet;
@@ -24,11 +27,6 @@ import org.apache.rocketmq.common.stats.StatsItemSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-
public class BrokerStatsManager {
public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
@@ -65,9 +63,9 @@ public class BrokerStatsManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME);
private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerStatsThread"));
+ "BrokerStatsThread"));
private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "CommercialStatsThread"));
+ "CommercialStatsThread"));
private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
private final String clusterName;
private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log);
@@ -89,7 +87,6 @@ public class BrokerStatsManager {
this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
-
this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
@@ -154,42 +151,36 @@ public class BrokerStatsManager {
this.statsTable.get(GROUP_GET_LATENCY).addValue(statsKey, incValue, 1);
}
-
public void incBrokerPutNums() {
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
}
-
public void incBrokerGetNums(final int incValue) {
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
}
-
public void incSendBackNums(final String group, final String topic) {
final String statsKey = buildStatsKey(topic, group);
this.statsTable.get(SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
}
-
public double tpsGroupGetNums(final String group, final String topic) {
final String statsKey = buildStatsKey(topic, group);
return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
}
-
public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) {
final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
}
-
public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) {
final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
}
public void incCommercialValue(final String key, final String owner, final String group,
- final String topic, final String type, final int incValue) {
+ final String topic, final String type, final int incValue) {
final String statsKey = buildCommercialStatsKey(owner, topic, group, type);
this.statsTable.get(key).addValue(statsKey, incValue, 1);
}
@@ -206,7 +197,6 @@ public class BrokerStatsManager {
return strBuilder.toString();
}
-
public enum StatsType {
SEND_SUCCESS,
SEND_FAILURE,
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
index ee9e68b..dc5d6a9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
+++ b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store.util;
@@ -22,9 +22,8 @@ import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;
-
public interface LibC extends Library {
- LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
+ LibC INSTANCE = (LibC)Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
int MADV_WILLNEED = 3;
int MADV_DONTNEED = 4;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index bfcb33e..5c6bde2 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -6,17 +6,21 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -26,17 +30,11 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.junit.Assert.assertTrue;
-
public class DefaultMessageStoreTest {
private static final Logger logger = LoggerFactory.getLogger(DefaultMessageStoreTest.class);
-
+
private static final String StoreMessage = "Once, there was a chance for me!";
private static int QUEUE_TOTAL = 100;
@@ -82,7 +80,7 @@ public class DefaultMessageStoreTest {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
}
-
+
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
@@ -133,7 +131,7 @@ public class DefaultMessageStoreTest {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
}
-
+
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
@@ -142,7 +140,7 @@ public class DefaultMessageStoreTest {
assertTrue(result != null);
result.release();
logger.debug("read " + i + " OK");
-
+
}
} finally {
master.shutdown();
@@ -150,9 +148,9 @@ public class DefaultMessageStoreTest {
}
logger.debug("================================================================");
}
-
+
private class MyMessageArrivingListener implements MessageArrivingListener {
-
+
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
// Do nothing here
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
index ac1e016..018cf99 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -20,12 +22,17 @@
*/
package org.apache.rocketmq.store;
-import org.junit.*;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class MappedFileQueueTest {
private static final Logger logger = LoggerFactory.getLogger(MappedFileQueueTest.class);
@@ -55,7 +62,7 @@ public class MappedFileQueueTest {
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
- new MappedFileQueue("target/unit_test_store/a/", 1024, null);
+ new MappedFileQueue("target/unit_test_store/a/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
@@ -73,7 +80,6 @@ public class MappedFileQueueTest {
logger.debug("MappedFileQueue.getLastMappedFile() OK");
}
-
@Test
public void test_findMapedFileByOffset() {
// four-byte string.
@@ -81,7 +87,7 @@ public class MappedFileQueueTest {
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
- new MappedFileQueue("target/unit_test_store/b/", 1024, null);
+ new MappedFileQueue("target/unit_test_store/b/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
@@ -96,23 +102,23 @@ public class MappedFileQueueTest {
MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
-
+
mappedFile = mappedFileQueue.findMappedFileByOffset(100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
-
+
mappedFile = mappedFileQueue.findMappedFileByOffset(1024);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024);
-
+
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024);
-
+
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
-
+
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
@@ -135,7 +141,7 @@ public class MappedFileQueueTest {
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
- new MappedFileQueue("target/unit_test_store/c/", 1024, null);
+ new MappedFileQueue("target/unit_test_store/c/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
@@ -148,27 +154,27 @@ public class MappedFileQueueTest {
boolean result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere());
-
+
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere());
-
+
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere());
-
+
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere());
-
+
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere());
-
+
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere());
-
+
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.flush() OK");
@@ -180,7 +186,7 @@ public class MappedFileQueueTest {
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
- new MappedFileQueue("target/unit_test_store/d/", 1024, null);
+ new MappedFileQueue("target/unit_test_store/d/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
index bfa09e9..a10f4eb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -20,6 +22,7 @@
*/
package org.apache.rocketmq.store;
+import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -27,16 +30,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
import static org.junit.Assert.assertTrue;
-
public class MappedFileTest {
-
+
private static final Logger logger = LoggerFactory.getLogger(MappedFileTest.class);
-
-
+
private static final String StoreMessage = "Once, there was a chance for me!";
@BeforeClass
@@ -54,15 +53,15 @@ public class MappedFileTest {
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
logger.debug("write OK");
-
+
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
byte[] data = new byte[StoreMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
-
+
logger.debug("Read: " + readString);
assertTrue(readString.equals(StoreMessage));
-
+
mappedFile.shutdown(1000);
assertTrue(!mappedFile.isAvailable());
selectMappedBufferResult.release();
@@ -76,11 +75,11 @@ public class MappedFileTest {
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
logger.debug("write OK");
-
+
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
selectMappedBufferResult.release();
mappedFile.shutdown(1000);
-
+
byte[] data = new byte[StoreMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
index f0245a9..2fe4d46 100644
--- a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -20,15 +22,13 @@
*/
package org.apache.rocketmq.store;
+import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-
import static org.junit.Assert.assertTrue;
-
public class StoreCheckpointTest {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -47,7 +47,7 @@ public class StoreCheckpointTest {
storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp);
storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp);
storeCheckpoint.flush();
-
+
long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp();
assertTrue(diff == 3000);
storeCheckpoint.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
index 95cf0c8..cdecb20 100644
--- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -20,16 +22,14 @@
*/
package org.apache.rocketmq.store.index;
-import org.junit.Test;
-
import java.util.ArrayList;
import java.util.List;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
public class IndexFileTest {
private static final int HASH_SLOT_NUM = 100;
private static final int INDEX_NUM = 400;
@@ -45,15 +45,14 @@ public class IndexFileTest {
// put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
-
+
indexFile.destroy(0);
}
-
@Test
public void test_put_get_index() throws Exception {
IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
-
+
for (long i = 0; i < (INDEX_NUM - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
@@ -62,7 +61,7 @@ public class IndexFileTest {
// put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
-
+
final List<Long> phyOffsets = new ArrayList<Long>();
indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
assertFalse(phyOffsets.isEmpty());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
index 7ac8233..c5d9756 100644
--- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -20,19 +22,21 @@
*/
package org.apache.rocketmq.store.schedule;
-import org.apache.rocketmq.store.*;
-import org.apache.rocketmq.store.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.junit.Assert.assertTrue;
@Ignore
@@ -65,7 +69,6 @@ public class ScheduleMessageTest {
long totalMsgs = 10000;
QUEUE_TOTAL = 32;
-
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
@@ -79,7 +82,6 @@ public class ScheduleMessageTest {
boolean load = master.load();
assertTrue(load);
-
master.start();
for (int i = 0; i < totalMsgs; i++) {
MessageExtBrokerInner msg = buildMessage();
@@ -92,7 +94,6 @@ public class ScheduleMessageTest {
System.out.println("write message over, wait time up");
Thread.sleep(1000 * 20);
-
for (long i = 0; i < totalMsgs; i++) {
try {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
@@ -110,10 +111,8 @@ public class ScheduleMessageTest {
Thread.sleep(1000 * 15);
-
master.shutdown();
-
master.destroy();
System.out.println("================================================================");
}