You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/28 15:31:11 UTC
incubator-kylin git commit: KYLIN-786, unit test done
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-786 294cb8097 -> 0ef50059b
KYLIN-786, unit test done
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0ef50059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0ef50059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0ef50059
Branch: refs/heads/KYLIN-786
Commit: 0ef50059b8c4cac191bcebc71e50323d7af8d893
Parents: 294cb80
Author: Li, Yang <ya...@ebay.com>
Authored: Thu May 28 21:31:02 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu May 28 21:31:02 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 41 ++--
.../gridtable/memstore/GTMemDiskStore.java | 204 ++++++++++++-------
.../storage/gridtable/GTMemDiskStoreTest.java | 69 +++++++
.../storage/gridtable/SimpleGridTableTest.java | 74 ++++---
4 files changed, 275 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index 36a0998..06d3cd2 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -1,6 +1,7 @@
package org.apache.kylin.common.util;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -11,7 +12,7 @@ public class MemoryBudgetController {
// return number MB released
int freeUp(int mb);
}
-
+
@SuppressWarnings("serial")
public static class NotEnoughBudgetException extends IllegalStateException {
}
@@ -32,17 +33,18 @@ public class MemoryBudgetController {
// all budget numbers are in MB
private final int totalBudgetMB;
+ private final AtomicInteger totalReservedMB;
private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
- private int totalReservedMB;
public MemoryBudgetController(int totalBudgetMB) {
- if (totalBudgetMB <= 0)
+ if (totalBudgetMB < 0)
throw new IllegalArgumentException();
if (checkSystemAvailMB(totalBudgetMB) == false)
throw new IllegalStateException();
this.totalBudgetMB = totalBudgetMB;
+ this.totalReservedMB = new AtomicInteger();
}
public int getTotalBudgetMB() {
@@ -50,14 +52,17 @@ public class MemoryBudgetController {
}
public int getTotalReservedMB() {
- return totalReservedMB;
+ return totalReservedMB.get();
}
-
+
public int getRemainingBudgetMB() {
- return totalBudgetMB - totalReservedMB;
+ return totalBudgetMB - totalReservedMB.get();
}
public void reserve(MemoryConsumer consumer, int requestMB) {
+ if (totalBudgetMB == 0 && requestMB > 0)
+ throw new NotEnoughBudgetException();
+
ConsumerEntry entry = booking.get(consumer);
if (entry == null) {
booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
@@ -74,35 +79,31 @@ public class MemoryBudgetController {
}
synchronized private void updateBooking(ConsumerEntry entry, int delta) {
- totalReservedMB += delta;
- entry.reservedMB -= delta;
+ totalReservedMB.addAndGet(delta);
+ entry.reservedMB += delta;
if (entry.reservedMB == 0) {
booking.remove(entry.consumer);
}
if (delta < 0) {
this.notify();
}
- logger.debug(entry + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+ logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
}
- synchronized private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
+ private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
while (true) {
// if budget is not enough, try free up
- if (delta > totalBudgetMB - totalReservedMB) {
+ while (delta > totalBudgetMB - totalReservedMB.get()) {
int freeUpToGo = delta;
for (ConsumerEntry entry : booking.values()) {
- if (entry.consumer != consumer.consumer) {
- int mb = entry.consumer.freeUp(freeUpToGo);
- updateBooking(entry, -mb);
- freeUpToGo -= mb;
- if (freeUpToGo <= 0)
- break;
- }
+ int mb = entry.consumer.freeUp(freeUpToGo);
+ updateBooking(entry, -mb);
+ freeUpToGo -= mb;
+ if (freeUpToGo <= 0)
+ break;
}
if (freeUpToGo > 0)
throw new NotEnoughBudgetException();
-
- Runtime.getRuntime().gc();
}
if (checkSystemAvailMB(delta))
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index dfa9cd8..4ae4f5c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -15,7 +15,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.BitSet;
-import java.util.HashSet;
import java.util.NoSuchElementException;
import org.apache.kylin.common.util.MemoryBudgetController;
@@ -47,10 +46,13 @@ public class GTMemDiskStore implements IGTStore, Closeable {
this(info, budgetCtrl, diskFile, false);
}
- public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+ private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnExit) throws IOException {
this.info = info;
this.memPart = new MemPart(budgetCtrl);
- this.diskPart = new DiskPart(diskFile, delOnClose);
+ this.diskPart = new DiskPart(diskFile);
+
+ if (delOnExit)
+ diskFile.deleteOnExit();
}
@Override
@@ -79,17 +81,24 @@ public class GTMemDiskStore implements IGTStore, Closeable {
diskPart.close();
}
+ @Override
+ public String toString() {
+ return "MemDiskStore@" + this.hashCode();
+ }
+
private class Reader implements IGTStoreScanner {
final DataInputStream din;
long diskOffset = 0;
long memRead = 0;
long diskRead = 0;
+ int nReadCalls = 0;
GTRowBlock block = GTRowBlock.allocate(info);
GTRowBlock next = null;
- Reader() {
+ Reader() throws IOException {
+ diskPart.openRead();
logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
InputStream in = new InputStream() {
@@ -107,6 +116,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public int read(byte[] b, int off, int len) throws IOException {
+ nReadCalls++;
if (available() <= 0)
return -1;
@@ -138,6 +148,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
}
lenToGo -= n;
nRead += n;
+ off += n;
diskOffset += n;
}
return nRead;
@@ -189,7 +200,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public void close() throws IOException {
din.close();
- logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk");
+ diskPart.closeRead();
+ logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
}
}
@@ -198,11 +210,15 @@ public class GTMemDiskStore implements IGTStore, Closeable {
final DataOutputStream dout;
long diskOffset;
+ long memWrite = 0;
+ long diskWrite = 0;
+ int nWriteCalls;
Writer(long startOffset) throws IOException {
- diskOffset = startOffset;
+ diskOffset = 0; // TODO does not support append yet
memPart.clear();
diskPart.clear();
+ diskPart.openWrite(false);
logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
memPart.activateMemWrite();
@@ -219,19 +235,22 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public void write(byte[] bytes, int offset, int length) throws IOException {
+ nWriteCalls++;
while (length > 0) {
+ int n;
if (memPartActivated) {
- int n = memPart.write(bytes, offset, length, diskOffset);
- offset += n;
- length -= n;
- diskOffset += n;
+ n = memPart.write(bytes, offset, length, diskOffset);
+ memWrite += n;
if (n == 0) {
memPartActivated = false;
}
} else {
- diskPart.write(diskOffset, bytes, offset, length);
- diskOffset += length;
+ n = diskPart.write(diskOffset, bytes, offset, length);
+ diskWrite += n;
}
+ offset += n;
+ length -= n;
+ diskOffset += n;
}
}
};
@@ -247,8 +266,9 @@ public class GTMemDiskStore implements IGTStore, Closeable {
public void close() throws IOException {
dout.close();
memPart.finishAsyncFlush();
+ diskPart.closeWrite();
assert diskOffset == diskPart.tailOffset;
- logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset);
+ logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
}
}
@@ -305,33 +325,61 @@ public class GTMemDiskStore implements IGTStore, Closeable {
synchronized public MemChunk seekMemChunk(long diskOffset) {
MemChunk c = firstChunk;
while (c != null && c.headOffset() <= diskOffset) {
- if (c.tailOffset() < diskOffset)
+ if (diskOffset < c.tailOffset())
break;
+ c = c.next;
}
return c;
}
- synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
- if (writeActivated == false)
- return 0;
+ public int write(byte[] bytes, int offset, int length, long diskOffset) {
+ int needMoreMem = 0;
- // write is only expected at the tail
- if (diskOffset != tailOffset())
- return 0;
-
- if (chunkCount == 0 || lastChunk.isFull()) {
- allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
+ synchronized (this) {
if (writeActivated == false)
return 0;
+
+ // write is only expected at the tail
+ if (diskOffset != tailOffset())
+ return 0;
+
+ if (chunkCount == 0 || lastChunk.isFull())
+ needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
}
- int n = Math.min(lastChunk.freeSpace(), length);
- System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
- lastChunk.length += n;
+ // call to budgetCtrl.reserve() out of synchronized block, or deadlock may happen between MemoryConsumers
+ if (needMoreMem > 0) {
+ try {
+ budgetCtrl.reserve(this, needMoreMem);
+ } catch (NotEnoughBudgetException ex) {
+ deactivateMemWrite();
+ return 0;
+ }
+ }
+
+ synchronized (this) {
+ if (needMoreMem > 0) {
+ MemChunk chunk = new MemChunk();
+ chunk.diskOffset = diskOffset;
+ chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+ if (chunkCount == 0) {
+ firstChunk = lastChunk = chunk;
+ } else {
+ lastChunk.next = chunk;
+ lastChunk = chunk;
+ }
+ chunkCount++;
+ }
+
+ int n = Math.min(lastChunk.freeSpace(), length);
+ System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+ lastChunk.length += n;
- asyncFlush(lastChunk, diskOffset, n);
+ if (n > 0)
+ asyncFlush(lastChunk, diskOffset, n);
- return n;
+ return n;
+ }
}
private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
@@ -365,11 +413,13 @@ public class GTMemDiskStore implements IGTStore, Closeable {
byte[] data;
int offset = 0;
int length = 0;
+ int flushedLen = 0;
while (true) {
data = null;
synchronized (memPart) {
- asyncFlushDiskOffset += length; // bytes written in last loop
+ asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+ // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
asyncFlushChunk = asyncFlushChunk.next;
}
@@ -383,31 +433,11 @@ public class GTMemDiskStore implements IGTStore, Closeable {
if (data == null)
break;
- diskPart.write(asyncFlushDiskOffset, data, offset, length);
- }
- }
-
- private void allocateNewMemChunk(long diskOffset) {
- try {
- budgetCtrl.reserve(this, (chunkCount + 1) * MEM_CHUNK_SIZE_MB);
- } catch (NotEnoughBudgetException ex) {
- deactivateMemWrite();
- return;
- }
-
- MemChunk chunk = new MemChunk();
- chunk.diskOffset = diskOffset;
- chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
- if (chunkCount == 0) {
- firstChunk = lastChunk = chunk;
- } else {
- lastChunk.next = chunk;
- lastChunk = chunk;
+ flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
}
- chunkCount++;
}
- synchronized public void finishAsyncFlush() throws IOException {
+ public void finishAsyncFlush() throws IOException {
deactivateMemWrite();
if (asyncFlusher != null) {
try {
@@ -449,12 +479,12 @@ public class GTMemDiskStore implements IGTStore, Closeable {
public void activateMemWrite() {
writeActivated = true;
- logger.debug(this + " mem write activated");
+ logger.debug(GTMemDiskStore.this + " mem write activated");
}
public void deactivateMemWrite() {
writeActivated = false;
- logger.debug(this + " mem write de-activated");
+ logger.debug(GTMemDiskStore.this + " mem write de-activated");
}
synchronized public void clear() {
@@ -469,40 +499,74 @@ public class GTMemDiskStore implements IGTStore, Closeable {
clear();
}
+ @Override
+ public String toString() {
+ return GTMemDiskStore.this.toString();
+ }
+
}
private class DiskPart implements Closeable {
- final FileChannel channel;
+ final File diskFile;
+ FileChannel writeChannel;
+ FileChannel readChannel;
long tailOffset;
- DiskPart(File diskFile, boolean delOnClose) throws IOException {
- HashSet<StandardOpenOption> opts = new HashSet<StandardOpenOption>(10);
- opts.add(StandardOpenOption.CREATE);
- opts.add(StandardOpenOption.APPEND);
- opts.add(StandardOpenOption.WRITE);
- opts.add(StandardOpenOption.READ);
- if (delOnClose)
- opts.add(StandardOpenOption.DELETE_ON_CLOSE);
- this.channel = FileChannel.open(diskFile.toPath(), opts);
+ DiskPart(File diskFile) throws IOException {
+ this.diskFile = diskFile;
+ this.tailOffset = diskFile.length();
+ logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+ }
+
+ public void openRead() throws IOException {
+ readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+ tailOffset = diskFile.length();
}
public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
- return channel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
}
- void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
- channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
- tailOffset = Math.max(diskOffset + length, tailOffset);
+ public void closeRead() throws IOException {
+ if (readChannel != null) {
+ readChannel.close();
+ readChannel = null;
+ }
+ }
+
+ public void openWrite(boolean append) throws IOException {
+ if (append) {
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+ tailOffset = diskFile.length();
+ } else {
+ diskFile.delete();
+ writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+ tailOffset = 0;
+ }
+ }
+
+ public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+ int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+ tailOffset = Math.max(diskOffset + n, tailOffset);
+ return n;
+ }
+
+ public void closeWrite() throws IOException {
+ if (writeChannel != null) {
+ writeChannel.close();
+ writeChannel = null;
+ }
}
- void clear() throws IOException {
+ public void clear() throws IOException {
+ diskFile.delete();
tailOffset = 0;
- channel.truncate(0);
}
@Override
public void close() throws IOException {
- channel.close();
+ closeWrite();
+ closeRead();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
index dd9668e..b7678b8 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -1,5 +1,74 @@
package org.apache.kylin.storage.gridtable;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
+import org.junit.Test;
+
public class GTMemDiskStoreTest {
+ final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+ final GTInfo info = SimpleGridTableTest.advancedInfo();
+ final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
+
+ @Test
+ public void testSingleThreadWriteRead() throws IOException {
+ long start = System.currentTimeMillis();
+ verifyOneTableWriteAndRead();
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ @Test
+ public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+
+ int nThreads = 5;
+ Thread[] t = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ t[i] = new Thread() {
+ public void run() {
+ try {
+ verifyOneTableWriteAndRead();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+ t[i].start();
+ }
+ for (int i = 0; i < nThreads; i++) {
+ t[i].join();
+ }
+
+ long end = System.currentTimeMillis();
+ System.out.println("Cost " + (end - start) + " millis");
+ }
+
+ private void verifyOneTableWriteAndRead() throws IOException {
+ GTMemDiskStore store = new GTMemDiskStore(info, budgetCtrl);
+ GridTable table = new GridTable(info, store);
+ verifyWriteAndRead(table);
+ }
+
+ private void verifyWriteAndRead(GridTable table) throws IOException {
+ GTInfo info = table.getInfo();
+
+ GTBuilder builder = table.rebuild();
+ for (GTRecord r : data) {
+ builder.write(r);
+ }
+ builder.close();
+
+ IGTScanner scanner = table.scan(new GTScanRequest(info));
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
+ scanner.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0ef50059/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 7b676f2..05e61f8 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -4,9 +4,12 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.BitSet;
+import java.util.List;
import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.storage.gridtable.GTInfo.Builder;
import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
@@ -115,56 +118,81 @@ public class SimpleGridTableTest {
}
static GTBuilder rebuild(GridTable table) throws IOException {
- GTRecord r = new GTRecord(table.getInfo());
GTBuilder builder = table.rebuild();
-
- builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ for (GTRecord rec : mockupData(table.getInfo(), 10)) {
+ builder.write(rec);
+ }
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
return builder;
}
+
+ static List<GTRecord> mockupData(GTInfo info, int nRows) {
+ List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+ int round = nRows / 10;
+ for (int i = 0; i < round; i++) {
+ String d_01_14 = datePlus("2015-01-14", i * 4);
+ String d_01_15 = datePlus("2015-01-15", i * 4);
+ String d_01_16 = datePlus("2015-01-16", i * 4);
+ String d_01_17 = datePlus("2015-01-17", i * 4);
+ result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ }
+ return result;
+ }
+
+ private static String datePlus(String date, int plusDays) {
+ long millis = DateFormat.stringToMillis(date);
+ millis += (1000L * 3600L * 24L) * plusDays;
+ return DateFormat.formatToDateStr(millis);
+ }
+
+ private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
+ GTRecord rec = new GTRecord(info);
+ return rec.setValues(date, name, category, amount, price);
+ }
static void rebuildViaAppend(GridTable table) throws IOException {
- GTRecord r = new GTRecord(table.getInfo());
+ List<GTRecord> data = mockupData(table.getInfo(), 10);
GTBuilder builder;
+ int i = 0;
builder = table.append();
- builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
builder = table.append();
- builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
builder = table.append();
- builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());
builder = table.append();
- builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+ builder.write(data.get(i++));
builder.close();
System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
System.out.println("Written Row Count: " + builder.getWrittenRowCount());