You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/03/07 06:59:55 UTC
git commit: TAJO-657: Missing table stat in RCFile. (jinho)
Repository: incubator-tajo
Updated Branches:
refs/heads/master e3da0cafd -> bd673e645
TAJO-657: Missing table stat in RCFile. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bd673e64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bd673e64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bd673e64
Branch: refs/heads/master
Commit: bd673e64589d636241f83ce6ab16030e7edc6d4b
Parents: e3da0ca
Author: jinossy <ji...@gmail.com>
Authored: Fri Mar 7 14:57:38 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Mar 7 14:57:38 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../querymaster/QueryMasterManagerService.java | 10 ++-
.../resources/webapps/worker/querydetail.jsp | 4 +-
.../resources/webapps/worker/querytasks.jsp | 6 +-
.../java/org/apache/tajo/storage/CSVFile.java | 31 +++++---
.../org/apache/tajo/storage/rcfile/RCFile.java | 77 +++++++++++++++++---
.../tajo/storage/TestCompressionStorages.java | 6 +-
.../org/apache/tajo/storage/TestStorages.java | 10 +++
8 files changed, 113 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 07048dd..5082b8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -521,6 +521,8 @@ Release 0.8.0 - unreleased
TASKS
+ TAJO-657: Missing table stat in RCFile. (jinho)
+
TAJO-659: Add Tajo JDBC documentation. (hyunsik)
TAJO-642: Change tajo documentation tool to sphinx. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 70d40e7..e3e0260 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -145,7 +145,6 @@ public class QueryMasterManagerService extends CompositeService
try {
QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
- LOG.info("statusUpdate from " + attemptId);
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
if (queryMasterTask == null) {
queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
@@ -153,13 +152,16 @@ public class QueryMasterManagerService extends CompositeService
SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
- LOG.info(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+ }
+
if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
- LOG.info(attemptId + " Killed");
+ LOG.warn(attemptId + " Killed");
attempt.handle(
new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
} else {
- LOG.info(attemptId + " updated");
queryMasterTask.getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
index 94e5f2e..2d867ed 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -81,10 +81,10 @@ for(SubQuery eachSubQuery: subQueries) {
<p/>
<hr/>
<h3>Logical Plan</h3>
- <pre><%=query.getPlan().getLogicalPlan().toString()%></pre>
+ <pre style="white-space:pre-wrap;"><%=query.getPlan().getLogicalPlan().toString()%></pre>
<hr/>
<h3>Distributed Query Plan</h3>
- <pre><%=query.getPlan().toString()%></pre>
+ <pre style="white-space:pre-wrap;"><%=query.getPlan().toString()%></pre>
<hr/>
</div>
</body>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
index 744c014..ab6ff26 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
@@ -151,16 +151,16 @@
<h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a></h3>
<hr/>
<p/>
- <pre><%=PlannerUtil.buildExplainString(subQuery.getBlock().getPlan())%></pre>
+ <pre style="white-space:pre-wrap;"><%=PlannerUtil.buildExplainString(subQuery.getBlock().getPlan())%></pre>
<p/>
<table border="1" width="100%" class="border_table">
- <tr><td align='right' width='150px'>Status:</td><td><%=subQuery.getState()%></td></tr>
+ <tr><td align='right' width='180px'>Status:</td><td><%=subQuery.getState()%></td></tr>
<tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr>
<tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getTaskScheduler().getRackLocalAssigned()%>)</td></tr>
<tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float)(totalProgress/numTasks))%>%</td></tr>
<tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr>
<tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr>
- <tr><td align='right'>Read Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadRows) + " B)"%></td></tr>
+ <tr><td align='right'>Actual Processed Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadBytes) + " B)"%></td></tr>
<tr><td align='right'>Input Rows:</td><td><%=nf.format(totalReadRows)%></td></tr>
<tr><td align='right'>Output Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalWriteBytes, false) + " (" + nf.format(totalWriteBytes) + " B)"%></td></tr>
<tr><td align='right'>Output Rows:</td><td><%=nf.format(totalWriteRows)%></td></tr>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 6c1923c..cf1ccc0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
@@ -74,6 +75,7 @@ public class CSVFile {
private int BUFFER_SIZE = 128 * 1024;
private int bufferedBytes = 0;
private long pos = 0;
+ private boolean isShuffle;
private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
private SerializerDeserializer serde;
@@ -99,6 +101,15 @@ public class CSVFile {
throw new FileNotFoundException(path.toString());
}
+ //determine the intermediate file type
+ String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+ TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+ if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+ isShuffle = true;
+ } else {
+ isShuffle = false;
+ }
+
String codecName = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
if(!StringUtils.isEmpty(codecName)){
codecFactory = new CompressionCodecFactory(conf);
@@ -157,7 +168,8 @@ public class CSVFile {
os.write((byte) delimiter);
rowBytes += 1;
}
- if (enabledStats) {
+ if (isShuffle) {
+ // it is to calculate min/max values, and it is only used for the intermediate file.
stats.analyzeField(i, datum);
}
}
@@ -344,8 +356,7 @@ public class CSVFile {
}
if (startOffset != 0) {
- startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset));
- pos = startOffset;
+ pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
}
eof = false;
page();
@@ -407,7 +418,7 @@ public class CSVFile {
}
}
if (tableStats != null) {
- tableStats.setReadBytes(getFilePosition() - startOffset);
+ tableStats.setReadBytes(pos - startOffset);
tableStats.setNumRows(recordCount);
}
}
@@ -419,16 +430,12 @@ public class CSVFile {
return 1.0f;
}
long filePos = getFilePosition();
-
- if (tableStats != null) {
- tableStats.setReadBytes(filePos - startOffset);
- tableStats.setNumRows(recordCount);
- }
-
if (startOffset == filePos) {
return 0.0f;
} else {
- return Math.min(1.0f, (float)(filePos - startOffset) / (float)(end - startOffset));
+ long readBytes = filePos - startOffset;
+ long remainingBytes = Math.max(end - filePos, 0);
+ return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
@@ -485,7 +492,7 @@ public class CSVFile {
public void close() throws IOException {
try {
if (tableStats != null) {
- tableStats.setReadBytes(fragment.getEndKey());
+ tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead)
tableStats.setNumRows(recordCount);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 90d7f48..07f4c8e 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
@@ -454,6 +455,7 @@ public class RCFile {
CompressionCodec codec;
Decompressor decompressor = null;
NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
+ private long readBytes = 0;
public ValueBuffer(KeyBuffer currentKey, int columnNumber,
@@ -464,7 +466,6 @@ public class RCFile {
this.skippedColIDs = skippedIDs;
this.codec = codec;
loadedColumnsValueBuffer = new NonSyncByteArrayOutputStream[targets.length];
-
if (codec != null) {
decompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
}
@@ -522,6 +523,7 @@ public class RCFile {
valBuf.reset();
valBuf.write(in, vaRowsLen);
}
+ readBytes += keyBuffer.eachColumnUncompressedValueLen[i];
addIndex++;
}
@@ -530,8 +532,13 @@ public class RCFile {
}
}
+ public long getReadBytes() {
+ return readBytes;
+ }
+
public void clearColumnBuffer() throws IOException {
decompressBuffer.reset();
+ readBytes = 0;
}
@Override
@@ -592,7 +599,8 @@ public class RCFile {
private ColumnBuffer[] columnBuffers = null;
boolean useNewMagic = true;
private byte[] nullChars;
- SerializerDeserializer serde;
+ private SerializerDeserializer serde;
+ private boolean isShuffle;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -717,6 +725,15 @@ public class RCFile {
throw new FileNotFoundException(path.toString());
}
+ //determine the intermediate file type
+ String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+ TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+ if (enabledStats && CatalogProtos.StoreType.RCFILE == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+ isShuffle = true;
+ } else {
+ isShuffle = false;
+ }
+
String codecClassname = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
if (!StringUtils.isEmpty(codecClassname)) {
try {
@@ -875,7 +892,8 @@ public class RCFile {
Datum datum = tuple.get(i);
int length = columnBuffers[i].append(schema.getColumn(i), datum);
columnBufferSize += length;
- if (enabledStats) {
+ if (isShuffle) {
+ // it is to calculate min/max values, and it is only used for the intermediate file.
stats.analyzeField(i, datum);
}
}
@@ -883,7 +901,7 @@ public class RCFile {
if (size < columnNumber) {
for (int i = size; i < columnNumber; i++) {
columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
- if (enabledStats) {
+ if (isShuffle) {
stats.analyzeField(i, NullDatum.get());
}
}
@@ -1083,6 +1101,10 @@ public class RCFile {
clearColumnBuffers();
if (out != null) {
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
// Close the underlying stream if we own it...
out.flush();
IOUtils.cleanup(LOG, out);
@@ -1139,6 +1161,7 @@ public class RCFile {
private Decompressor keyDecompressor;
+ private long readBytes = 0;
//Current state of each selected column - e.g. current run length, etc.
// The size of the array is equal to the number of selected columns
@@ -1154,19 +1177,19 @@ public class RCFile {
public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
final FileFragment fragment) throws IOException {
super(conf, schema, meta, fragment);
-
- rowId = new LongWritable();
conf.setInt("io.file.buffer.size", 4096); //TODO remove
-
startOffset = fragment.getStartKey();
endOffset = startOffset + fragment.getEndKey();
- more = startOffset < endOffset;
start = 0;
}
@Override
public void init() throws IOException {
+ more = startOffset < endOffset;
+ rowId = new LongWritable();
+ readBytes = 0;
+
String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL));
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
@@ -1353,6 +1376,7 @@ public class RCFile {
in.readFully(sync); // read sync bytes
headerEnd = in.getPos();
+ readBytes += headerEnd;
}
/**
@@ -1418,6 +1442,7 @@ public class RCFile {
while (n > 0 && (in.getPos() + n) <= end) {
position = in.getPos();
in.readFully(buffer, prefix, n);
+ readBytes += n;
/* the buffer has n+sync bytes */
for (int i = 0; i < n; i++) {
int j;
@@ -1464,11 +1489,13 @@ public class RCFile {
return -1;
}
int length = in.readInt();
+ readBytes += 4;
if (sync != null && length == SYNC_ESCAPE) { // process
// a
// sync entry
lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
in.readFully(syncCheck); // read syncCheck
+ readBytes += SYNC_HASH_SIZE;
if (!Arrays.equals(sync, syncCheck)) {
throw new IOException("File is corrupt!");
}
@@ -1477,6 +1504,7 @@ public class RCFile {
return -1;
}
length = in.readInt(); // re-read length
+ readBytes += 4;
} else {
syncSeen = false;
}
@@ -1508,6 +1536,7 @@ public class RCFile {
}
currentKeyLength = in.readInt();
compressedKeyLen = in.readInt();
+ readBytes += 8;
if (decompress) {
byte[] compressedBytes = new byte[compressedKeyLen];
@@ -1537,7 +1566,7 @@ public class RCFile {
} else {
currentKey.readFields(in);
}
-
+ readBytes += currentKeyLength;
keyInit = true;
currentValue.inited = false;
@@ -1573,6 +1602,12 @@ public class RCFile {
currentValue.clearColumnBuffer();
currentValue.readFields(in);
currentValue.inited = true;
+ readBytes += currentValue.getReadBytes();
+
+ if (tableStats != null) {
+ tableStats.setReadBytes(readBytes);
+ tableStats.setNumRows(passedRowsNum);
+ }
}
private boolean rowFetched = false;
@@ -1599,6 +1634,25 @@ public class RCFile {
return tuple;
}
+ @Override
+ public float getProgress() {
+ try {
+ if(!more) {
+ return 1.0f;
+ }
+ long filePos = getPosition();
+ if (startOffset == filePos) {
+ return 0.0f;
+ } else {
+ //if scanner read the header, filePos moved to zero
+ return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getEndKey()));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return 0.0f;
+ }
+ }
+
/**
* Returns how many rows we fetched with nextBuffer(). It only means how many rows
* are read by nextBuffer(). The returned result may be smaller than actual number
@@ -1729,6 +1783,11 @@ public class RCFile {
@Override
public void close() throws IOException {
+ if (tableStats != null) {
+ tableStats.setReadBytes(readBytes); //Actual Processed Bytes. (decompressed bytes + header - seek)
+ tableStats.setNumRows(passedRowsNum);
+ }
+
IOUtils.cleanup(LOG, in, currentValue);
if (keyDecompressor != null) {
// Make sure we only return decompressor once.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index bec1556..a776eb6 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -42,9 +42,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class TestCompressionStorages {
@@ -229,5 +227,7 @@ public class TestCompressionStorages {
}
scanner.close();
assertEquals(tupleNum, tupleCnt);
+ assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index f4ce46b..3743c4d 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -273,6 +273,7 @@ public class TestStorages {
Path tablePath = new Path(testDir, "testVariousTypes.data");
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.enableStats();
appender.init();
QueryId queryid = new QueryId("12345", 5);
@@ -299,6 +300,8 @@ public class TestStorages {
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
scanner.init();
@@ -310,6 +313,8 @@ public class TestStorages {
}
}
scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
}
@Test
@@ -337,6 +342,7 @@ public class TestStorages {
Path tablePath = new Path(testDir, "testVariousTypes.data");
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.enableStats();
appender.init();
QueryId queryid = new QueryId("12345", 5);
@@ -363,6 +369,8 @@ public class TestStorages {
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
scanner.init();
@@ -374,6 +382,8 @@ public class TestStorages {
}
}
scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
}
@Test