You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/28 15:31:11 UTC

incubator-kylin git commit: KYLIN-786, unit test done

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-786 294cb8097 -> 0ef50059b


KYLIN-786, unit test done


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0ef50059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0ef50059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0ef50059

Branch: refs/heads/KYLIN-786
Commit: 0ef50059b8c4cac191bcebc71e50323d7af8d893
Parents: 294cb80
Author: Li, Yang <ya...@ebay.com>
Authored: Thu May 28 21:31:02 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu May 28 21:31:02 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     |  41 ++--
 .../gridtable/memstore/GTMemDiskStore.java      | 204 ++++++++++++-------
 .../storage/gridtable/GTMemDiskStoreTest.java   |  69 +++++++
 .../storage/gridtable/SimpleGridTableTest.java  |  74 ++++---
 4 files changed, 275 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index 36a0998..06d3cd2 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -1,6 +1,7 @@
 package org.apache.kylin.common.util;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -11,7 +12,7 @@ public class MemoryBudgetController {
         // return number MB released
         int freeUp(int mb);
     }
-    
+
     @SuppressWarnings("serial")
     public static class NotEnoughBudgetException extends IllegalStateException {
     }
@@ -32,17 +33,18 @@ public class MemoryBudgetController {
 
     // all budget numbers are in MB
     private final int totalBudgetMB;
+    private final AtomicInteger totalReservedMB;
     private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
 
-    private int totalReservedMB;
 
     public MemoryBudgetController(int totalBudgetMB) {
-        if (totalBudgetMB <= 0)
+        if (totalBudgetMB < 0)
             throw new IllegalArgumentException();
         if (checkSystemAvailMB(totalBudgetMB) == false)
             throw new IllegalStateException();
 
         this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = new AtomicInteger();
     }
 
     public int getTotalBudgetMB() {
@@ -50,14 +52,17 @@ public class MemoryBudgetController {
     }
 
     public int getTotalReservedMB() {
-        return totalReservedMB;
+        return totalReservedMB.get();
     }
-    
+
     public int getRemainingBudgetMB() {
-        return totalBudgetMB - totalReservedMB;
+        return totalBudgetMB - totalReservedMB.get();
     }
 
     public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+        
         ConsumerEntry entry = booking.get(consumer);
         if (entry == null) {
             booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
@@ -74,35 +79,31 @@ public class MemoryBudgetController {
     }
 
     synchronized private void updateBooking(ConsumerEntry entry, int delta) {
-        totalReservedMB += delta;
-        entry.reservedMB -= delta;
+        totalReservedMB.addAndGet(delta);
+        entry.reservedMB += delta;
         if (entry.reservedMB == 0) {
             booking.remove(entry.consumer);
         }
         if (delta < 0) {
             this.notify();
         }
-        logger.debug(entry + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
     }
 
-    synchronized private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
+    private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
         while (true) {
             // if budget is not enough, try free up 
-            if (delta > totalBudgetMB - totalReservedMB) {
+            while (delta > totalBudgetMB - totalReservedMB.get()) {
                 int freeUpToGo = delta;
                 for (ConsumerEntry entry : booking.values()) {
-                    if (entry.consumer != consumer.consumer) {
-                        int mb = entry.consumer.freeUp(freeUpToGo);
-                        updateBooking(entry, -mb);
-                        freeUpToGo -= mb;
-                        if (freeUpToGo <= 0)
-                            break;
-                    }
+                    int mb = entry.consumer.freeUp(freeUpToGo);
+                    updateBooking(entry, -mb);
+                    freeUpToGo -= mb;
+                    if (freeUpToGo <= 0)
+                        break;
                 }
                 if (freeUpToGo > 0)
                     throw new NotEnoughBudgetException();
-
-                Runtime.getRuntime().gc();
             }
 
             if (checkSystemAvailMB(delta))

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index dfa9cd8..4ae4f5c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -15,7 +15,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.BitSet;
-import java.util.HashSet;
 import java.util.NoSuchElementException;
 
 import org.apache.kylin.common.util.MemoryBudgetController;
@@ -47,10 +46,13 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         this(info, budgetCtrl, diskFile, false);
     }
 
-    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+    private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnExit) throws IOException {
         this.info = info;
         this.memPart = new MemPart(budgetCtrl);
-        this.diskPart = new DiskPart(diskFile, delOnClose);
+        this.diskPart = new DiskPart(diskFile);
+
+        if (delOnExit)
+            diskFile.deleteOnExit();
     }
 
     @Override
@@ -79,17 +81,24 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         diskPart.close();
     }
 
+    @Override
+    public String toString() {
+        return "MemDiskStore@" + this.hashCode();
+    }
+
     private class Reader implements IGTStoreScanner {
 
         final DataInputStream din;
         long diskOffset = 0;
         long memRead = 0;
         long diskRead = 0;
+        int nReadCalls = 0;
 
         GTRowBlock block = GTRowBlock.allocate(info);
         GTRowBlock next = null;
 
-        Reader() {
+        Reader() throws IOException {
+            diskPart.openRead();
             logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
 
             InputStream in = new InputStream() {
@@ -107,6 +116,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
                 @Override
                 public int read(byte[] b, int off, int len) throws IOException {
+                    nReadCalls++;
                     if (available() <= 0)
                         return -1;
 
@@ -138,6 +148,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                         }
                         lenToGo -= n;
                         nRead += n;
+                        off += n;
                         diskOffset += n;
                     }
                     return nRead;
@@ -189,7 +200,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         @Override
         public void close() throws IOException {
             din.close();
-            logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk");
+            diskPart.closeRead();
+            logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
         }
 
     }
@@ -198,11 +210,15 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         final DataOutputStream dout;
         long diskOffset;
+        long memWrite = 0;
+        long diskWrite = 0;
+        int nWriteCalls;
 
         Writer(long startOffset) throws IOException {
-            diskOffset = startOffset;
+            diskOffset = 0; // TODO does not support append yet
             memPart.clear();
             diskPart.clear();
+            diskPart.openWrite(false);
             logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
 
             memPart.activateMemWrite();
@@ -219,19 +235,22 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
                 @Override
                 public void write(byte[] bytes, int offset, int length) throws IOException {
+                    nWriteCalls++;
                     while (length > 0) {
+                        int n;
                         if (memPartActivated) {
-                            int n = memPart.write(bytes, offset, length, diskOffset);
-                            offset += n;
-                            length -= n;
-                            diskOffset += n;
+                            n = memPart.write(bytes, offset, length, diskOffset);
+                            memWrite += n;
                             if (n == 0) {
                                 memPartActivated = false;
                             }
                         } else {
-                            diskPart.write(diskOffset, bytes, offset, length);
-                            diskOffset += length;
+                            n = diskPart.write(diskOffset, bytes, offset, length);
+                            diskWrite += n;
                         }
+                        offset += n;
+                        length -= n;
+                        diskOffset += n;
                     }
                 }
             };
@@ -247,8 +266,9 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         public void close() throws IOException {
             dout.close();
             memPart.finishAsyncFlush();
+            diskPart.closeWrite();
             assert diskOffset == diskPart.tailOffset;
-            logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset);
+            logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
         }
     }
 
@@ -305,33 +325,61 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         synchronized public MemChunk seekMemChunk(long diskOffset) {
             MemChunk c = firstChunk;
             while (c != null && c.headOffset() <= diskOffset) {
-                if (c.tailOffset() < diskOffset)
+                if (diskOffset < c.tailOffset())
                     break;
+                c = c.next;
             }
             return c;
         }
 
-        synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
-            if (writeActivated == false)
-                return 0;
+        public int write(byte[] bytes, int offset, int length, long diskOffset) {
+            int needMoreMem = 0;
 
-            // write is only expected at the tail
-            if (diskOffset != tailOffset())
-                return 0;
-
-            if (chunkCount == 0 || lastChunk.isFull()) {
-                allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
+            synchronized (this) {
                 if (writeActivated == false)
                     return 0;
+
+                // write is only expected at the tail
+                if (diskOffset != tailOffset())
+                    return 0;
+
+                if (chunkCount == 0 || lastChunk.isFull())
+                    needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
             }
 
-            int n = Math.min(lastChunk.freeSpace(), length);
-            System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
-            lastChunk.length += n;
+            // call to budgetCtrl.reserve() out of synchronized block, or deadlock may happen between MemoryConsumers
+            if (needMoreMem > 0) {
+                try {
+                    budgetCtrl.reserve(this, needMoreMem);
+                } catch (NotEnoughBudgetException ex) {
+                    deactivateMemWrite();
+                    return 0;
+                }
+            }
+
+            synchronized (this) {
+                if (needMoreMem > 0) {
+                    MemChunk chunk = new MemChunk();
+                    chunk.diskOffset = diskOffset;
+                    chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+                    if (chunkCount == 0) {
+                        firstChunk = lastChunk = chunk;
+                    } else {
+                        lastChunk.next = chunk;
+                        lastChunk = chunk;
+                    }
+                    chunkCount++;
+                }
+                
+                int n = Math.min(lastChunk.freeSpace(), length);
+                System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+                lastChunk.length += n;
 
-            asyncFlush(lastChunk, diskOffset, n);
+                if (n > 0)
+                    asyncFlush(lastChunk, diskOffset, n);
 
-            return n;
+                return n;
+            }
         }
 
         private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
@@ -365,11 +413,13 @@ public class GTMemDiskStore implements IGTStore, Closeable {
             byte[] data;
             int offset = 0;
             int length = 0;
+            int flushedLen = 0;
 
             while (true) {
                 data = null;
                 synchronized (memPart) {
-                    asyncFlushDiskOffset += length; // bytes written in last loop
+                    asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+                    // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
                     if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
                         asyncFlushChunk = asyncFlushChunk.next;
                     }
@@ -383,31 +433,11 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                 if (data == null)
                     break;
 
-                diskPart.write(asyncFlushDiskOffset, data, offset, length);
-            }
-        }
-
-        private void allocateNewMemChunk(long diskOffset) {
-            try {
-                budgetCtrl.reserve(this, (chunkCount + 1) * MEM_CHUNK_SIZE_MB);
-            } catch (NotEnoughBudgetException ex) {
-                deactivateMemWrite();
-                return;
-            }
-
-            MemChunk chunk = new MemChunk();
-            chunk.diskOffset = diskOffset;
-            chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
-            if (chunkCount == 0) {
-                firstChunk = lastChunk = chunk;
-            } else {
-                lastChunk.next = chunk;
-                lastChunk = chunk;
+                flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
             }
-            chunkCount++;
         }
 
-        synchronized public void finishAsyncFlush() throws IOException {
+        public void finishAsyncFlush() throws IOException {
             deactivateMemWrite();
             if (asyncFlusher != null) {
                 try {
@@ -449,12 +479,12 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         public void activateMemWrite() {
             writeActivated = true;
-            logger.debug(this + " mem write activated");
+            logger.debug(GTMemDiskStore.this + " mem write activated");
         }
 
         public void deactivateMemWrite() {
             writeActivated = false;
-            logger.debug(this + " mem write de-activated");
+            logger.debug(GTMemDiskStore.this + " mem write de-activated");
         }
 
         synchronized public void clear() {
@@ -469,40 +499,74 @@ public class GTMemDiskStore implements IGTStore, Closeable {
             clear();
         }
 
+        @Override
+        public String toString() {
+            return GTMemDiskStore.this.toString();
+        }
+
     }
 
     private class DiskPart implements Closeable {
-        final FileChannel channel;
+        final File diskFile;
+        FileChannel writeChannel;
+        FileChannel readChannel;
         long tailOffset;
 
-        DiskPart(File diskFile, boolean delOnClose) throws IOException {
-            HashSet<StandardOpenOption> opts = new HashSet<StandardOpenOption>(10);
-            opts.add(StandardOpenOption.CREATE);
-            opts.add(StandardOpenOption.APPEND);
-            opts.add(StandardOpenOption.WRITE);
-            opts.add(StandardOpenOption.READ);
-            if (delOnClose)
-                opts.add(StandardOpenOption.DELETE_ON_CLOSE);
-            this.channel = FileChannel.open(diskFile.toPath(), opts);
+        DiskPart(File diskFile) throws IOException {
+            this.diskFile = diskFile;
+            this.tailOffset = diskFile.length();
+            logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+        }
+
+        public void openRead() throws IOException {
+            readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+            tailOffset = diskFile.length();
         }
 
         public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            return channel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+            return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
         }
 
-        void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
-            tailOffset = Math.max(diskOffset + length, tailOffset);
+        public void closeRead() throws IOException {
+            if (readChannel != null) {
+                readChannel.close();
+                readChannel = null;
+            }
+        }
+
+        public void openWrite(boolean append) throws IOException {
+            if (append) {
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+                tailOffset = diskFile.length();
+            } else {
+                diskFile.delete();
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+                tailOffset = 0;
+            }
+        }
+
+        public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+            tailOffset = Math.max(diskOffset + n, tailOffset);
+            return n;
+        }
+
+        public void closeWrite() throws IOException {
+            if (writeChannel != null) {
+                writeChannel.close();
+                writeChannel = null;
+            }
         }
 
-        void clear() throws IOException {
+        public void clear() throws IOException {
+            diskFile.delete();
             tailOffset = 0;
-            channel.truncate(0);
         }
 
         @Override
         public void close() throws IOException {
-            channel.close();
+            closeWrite();
+            closeRead();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
index dd9668e..b7678b8 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -1,5 +1,74 @@
 package org.apache.kylin.storage.gridtable;
 
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
+import org.junit.Test;
+
 public class GTMemDiskStoreTest {
 
+    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+    final GTInfo info = SimpleGridTableTest.advancedInfo();
+    final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
+
+    @Test
+    public void testSingleThreadWriteRead() throws IOException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead();
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    @Test
+    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+
+        int nThreads = 5;
+        Thread[] t = new Thread[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            t[i] = new Thread() {
+                public void run() {
+                    try {
+                        verifyOneTableWriteAndRead();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            t[i].start();
+        }
+        for (int i = 0; i < nThreads; i++) {
+            t[i].join();
+        }
+
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    private void verifyOneTableWriteAndRead() throws IOException {
+        GTMemDiskStore store = new GTMemDiskStore(info, budgetCtrl);
+        GridTable table = new GridTable(info, store);
+        verifyWriteAndRead(table);
+    }
+
+    private void verifyWriteAndRead(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        GTBuilder builder = table.rebuild();
+        for (GTRecord r : data) {
+            builder.write(r);
+        }
+        builder.close();
+
+        IGTScanner scanner = table.scan(new GTScanRequest(info));
+        int i = 0;
+        for (GTRecord r : scanner) {
+            assertEquals(data.get(i++), r);
+        }
+        scanner.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 7b676f2..05e61f8 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -4,9 +4,12 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.storage.gridtable.GTInfo.Builder;
 import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
@@ -115,56 +118,81 @@ public class SimpleGridTableTest {
     }
 
     static GTBuilder rebuild(GridTable table) throws IOException {
-        GTRecord r = new GTRecord(table.getInfo());
         GTBuilder builder = table.rebuild();
-
-        builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        for (GTRecord rec : mockupData(table.getInfo(), 10)) {
+            builder.write(rec);
+        }
         builder.close();
 
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
         return builder;
     }
+    
+    static List<GTRecord> mockupData(GTInfo info, int nRows) {
+        List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+        int round = nRows / 10;
+        for (int i = 0; i < round; i++) {
+            String d_01_14 = datePlus("2015-01-14", i * 4);
+            String d_01_15 = datePlus("2015-01-15", i * 4);
+            String d_01_16 = datePlus("2015-01-16", i * 4);
+            String d_01_17 = datePlus("2015-01-17", i * 4);
+            result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        }
+        return result;
+    }
+    
+    private static String datePlus(String date, int plusDays) {
+        long millis = DateFormat.stringToMillis(date);
+        millis += (1000L * 3600L * 24L) * plusDays;
+        return DateFormat.formatToDateStr(millis);
+    }
+
+    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
+        GTRecord rec = new GTRecord(info);
+        return rec.setValues(date, name, category, amount, price);
+    }
 
     static void rebuildViaAppend(GridTable table) throws IOException {
-        GTRecord r = new GTRecord(table.getInfo());
+        List<GTRecord> data = mockupData(table.getInfo(), 10);
         GTBuilder builder;
+        int i = 0;
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());