You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2016/12/27 06:58:25 UTC

[2/4] incubator-rocketmq git commit: [ROCKETMQ-9] Errors in rocketmq-store module.

[ROCKETMQ-9] Errors in rocketmq-store module.

JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-9


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/d1fa8694
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/d1fa8694
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/d1fa8694

Branch: refs/heads/master
Commit: d1fa8694ff82b429dcd811156edf7ea8a702237e
Parents: fed0976
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Dec 26 18:32:00 2016 +0900
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 27 09:50:23 2016 +0800

----------------------------------------------------------------------
 .../store/AllocateMappedFileService.java        |  2 +-
 .../alibaba/rocketmq/store/MappedFileQueue.java | 26 ++++++++++------
 .../alibaba/rocketmq/store/index/IndexFile.java |  5 ++--
 .../rocketmq/store/index/IndexService.java      | 31 +++++++++++---------
 .../rocketmq/store/MappedFileQueueTest.java     | 17 +++++++++--
 .../rocketmq/store/index/IndexFileTest.java     | 18 +++++++-----
 .../src/test/resources/logback-test.xml         |  2 +-
 7 files changed, 63 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
index 40eee7a..06113c8 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
@@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread {
                 isSuccess = true;
             }
         } catch (InterruptedException e) {
-            log.warn(this.getServiceName() + " service has exception, maybe by shutdown");
+            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
             this.hasException = true;
             return false;
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
index 2b006c0..8d9d3ab 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
@@ -459,27 +459,35 @@ public class MappedFileQueue {
         return result;
     }
 
-
+    /**
+     * Finds a mapped file by offset.
+     *
+     * @param offset Offset.
+     * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
+     * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
+     */
     public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
         try {
             MappedFile mappedFile = this.getFirstMappedFile();
             if (mappedFile != null) {
                 int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
                 if (index < 0 || index >= this.mappedFiles.size()) {
-                    LOG_ERROR.warn("findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}",
-                            offset,
-                            index,
-                            this.mappedFileSize,
-                            this.mappedFiles.size(),
-                            UtilAll.currentStackTrace());
+                    LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
+                        "mappedFileSize: {}, mappedFiles count: {}",
+                        mappedFile,
+                        offset,
+                        index,
+                        this.mappedFileSize,
+                        this.mappedFiles.size());
                 }
 
                 try {
                     return this.mappedFiles.get(index);
                 } catch (Exception e) {
-                    if (returnFirstOnNotFound) {
+                    if (returnFirstOnNotFound)
                         return mappedFile;
-                    }
+
+                    LOG_ERROR.warn("findMappedFileByOffset failure. {}", UtilAll.currentStackTrace());
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
index f353320..befa5f9 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
@@ -94,7 +94,6 @@ public class IndexFile {
         return this.indexHeader.getIndexCount() >= this.indexNum;
     }
 
-
     public boolean destroy(final long intervalForcibly) {
         return this.mappedFile.destroy(intervalForcibly);
     }
@@ -167,8 +166,8 @@ public class IndexFile {
                 }
             }
         } else {
-            log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
-                    + this.indexNum);
+            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+                + "; index max num = " + this.indexNum);
         }
 
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
index f275f80..fded747 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -49,6 +49,9 @@ public class IndexService {
     private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
+    /** Maximum times to attempt index file creation. */
+    private static final int MAX_TRY_IDX_CREATE = 3;
+
 
     public IndexService(final DefaultMessageStore store) {
         this.defaultMessageStore = store;
@@ -257,44 +260,44 @@ public class IndexService {
 
 
     private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
-        for (boolean ok =
-             indexFile.putKey(idxKey, msg.getCommitLogOffset(),
-                     msg.getStoreTimestamp()); !ok; ) {
-            log.warn("index file full, so create another one, " + indexFile.getFileName());
+        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
+            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
+
             indexFile = retryGetAndCreateIndexFile();
             if (null == indexFile) {
                 return null;
             }
 
-            ok =
-                    indexFile.putKey(idxKey, msg.getCommitLogOffset(),
-                            msg.getStoreTimestamp());
+            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
         }
+
         return indexFile;
     }
 
-
-    public IndexFile retryGetAndCreateIndexFile() {
+    /**
+     * Retries to get or create index file.
+     *
+     * @return {@link IndexFile} or null on failure.
+     */
+    private IndexFile retryGetAndCreateIndexFile() {
         IndexFile indexFile = null;
 
-
-        for (int times = 0; null == indexFile && times < 3; times++) {
+        for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
             indexFile = this.getAndCreateLastIndexFile();
             if (null != indexFile)
                 break;
 
             try {
-                log.error("try to create index file, " + times + " times");
+                log.error("Tried to create index file " + times + " times");
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
-
         if (null == indexFile) {
             this.defaultMessageStore.getAccessRights().makeIndexFileError();
-            log.error("mark index file can not build flag");
+            log.error("Mark index file cannot build flag");
         }
 
         return indexFile;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
index 89b37be..700f1c6 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
@@ -52,6 +52,7 @@ public class MappedFileQueueTest {
     @Test
     public void test_getLastMapedFile() {
         final String fixedMsg = "0123456789abcdef";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/a/", 1024, null);
@@ -59,6 +60,7 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
             if (!result) {
                 logger.debug("appendMessage " + i);
@@ -74,7 +76,9 @@ public class MappedFileQueueTest {
 
     @Test
     public void test_findMapedFileByOffset() {
+        // four-byte string.
         final String fixedMsg = "abcd";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/b/", 1024, null);
@@ -82,11 +86,13 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
-            // logger.debug("appendMessage " + bytes);
             assertTrue(result);
         }
 
+        assertEquals(fixedMsg.getBytes().length * 1024, mappedFileQueue.getMappedMemorySize());
+
         MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 0);
@@ -110,7 +116,8 @@ public class MappedFileQueueTest {
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
-        
+
+        // over mapped memory size.
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
         assertTrue(mappedFile == null);
 
@@ -125,6 +132,7 @@ public class MappedFileQueueTest {
     @Test
     public void test_commit() {
         final String fixedMsg = "0123456789abcdef";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/c/", 1024, null);
@@ -132,6 +140,7 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
             assertTrue(result);
         }
@@ -168,6 +177,7 @@ public class MappedFileQueueTest {
     @Test
     public void test_getMapedMemorySize() {
         final String fixedMsg = "abcd";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/d/", 1024, null);
@@ -175,14 +185,15 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
             assertTrue(result);
         }
 
         assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
+
         mappedFileQueue.shutdown(1000);
         mappedFileQueue.destroy();
         logger.debug("MappedFileQueue.getMappedMemorySize() OK");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
index f6bfc0a..9e446f7 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
@@ -31,17 +31,18 @@ import static org.junit.Assert.assertTrue;
 
 
 public class IndexFileTest {
-    private static final int hashSlotNum = 100;
-    private static final int indexNum = 400;
+    private static final int HASH_SLOT_NUM = 100;
+    private static final int INDEX_NUM = 400;
 
     @Test
     public void test_put_index() throws Exception {
-        IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
-        for (long i = 0; i < (indexNum - 1); i++) {
+        IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
+        for (long i = 0; i < (INDEX_NUM - 1); i++) {
             boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
             assertTrue(putResult);
         }
-    
+
+        // put over index file capacity.
         boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
         assertFalse(putResult);
     
@@ -51,12 +52,14 @@ public class IndexFileTest {
 
     @Test
     public void test_put_get_index() throws Exception {
-        IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
+        IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
     
-        for (long i = 0; i < (indexNum - 1); i++) {
+        for (long i = 0; i < (INDEX_NUM - 1); i++) {
             boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
             assertTrue(putResult);
         }
+
+        // put over index file capacity.
         boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
         assertFalse(putResult);
     
@@ -64,6 +67,7 @@ public class IndexFileTest {
         indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
         assertFalse(phyOffsets.isEmpty());
         assertEquals(1, phyOffsets.size());
+
         indexFile.destroy(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/resources/logback-test.xml b/rocketmq-store/src/test/resources/logback-test.xml
index acdfa10..11d429d 100644
--- a/rocketmq-store/src/test/resources/logback-test.xml
+++ b/rocketmq-store/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
     <appender-ref ref="STDOUT" />
   </logger>
 
-  <root level="WARN">
+  <root level="ERROR">
     <appender-ref ref="STDOUT" />
   </root>