You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/03/07 06:59:55 UTC

git commit: TAJO-657: Missing table stat in RCFile. (jinho)

Repository: incubator-tajo
Updated Branches:
  refs/heads/master e3da0cafd -> bd673e645


TAJO-657: Missing table stat in RCFile. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bd673e64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bd673e64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bd673e64

Branch: refs/heads/master
Commit: bd673e64589d636241f83ce6ab16030e7edc6d4b
Parents: e3da0ca
Author: jinossy <ji...@gmail.com>
Authored: Fri Mar 7 14:57:38 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Mar 7 14:57:38 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../querymaster/QueryMasterManagerService.java  | 10 ++-
 .../resources/webapps/worker/querydetail.jsp    |  4 +-
 .../resources/webapps/worker/querytasks.jsp     |  6 +-
 .../java/org/apache/tajo/storage/CSVFile.java   | 31 +++++---
 .../org/apache/tajo/storage/rcfile/RCFile.java  | 77 +++++++++++++++++---
 .../tajo/storage/TestCompressionStorages.java   |  6 +-
 .../org/apache/tajo/storage/TestStorages.java   | 10 +++
 8 files changed, 113 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 07048dd..5082b8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -521,6 +521,8 @@ Release 0.8.0 - unreleased
 
   TASKS
 
+    TAJO-657: Missing table stat in RCFile. (jinho)
+
     TAJO-659: Add Tajo JDBC documentation. (hyunsik)
 
     TAJO-642: Change tajo documentation tool to sphinx. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 70d40e7..e3e0260 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -145,7 +145,6 @@ public class QueryMasterManagerService extends CompositeService
     try {
       QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
       QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
-      LOG.info("statusUpdate from " + attemptId);
       QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
       if (queryMasterTask == null) {
         queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
@@ -153,13 +152,16 @@ public class QueryMasterManagerService extends CompositeService
       SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
       QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
       QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
-      LOG.info(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+      }
+
       if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
-        LOG.info(attemptId + " Killed");
+        LOG.warn(attemptId + " Killed");
         attempt.handle(
             new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
       } else {
-        LOG.info(attemptId + " updated");
         queryMasterTask.getEventHandler().handle(
             new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
index 94e5f2e..2d867ed 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -81,10 +81,10 @@ for(SubQuery eachSubQuery: subQueries) {
   <p/>
   <hr/>
   <h3>Logical Plan</h3>
-  <pre><%=query.getPlan().getLogicalPlan().toString()%></pre>
+  <pre style="white-space:pre-wrap;"><%=query.getPlan().getLogicalPlan().toString()%></pre>
   <hr/>
   <h3>Distributed Query Plan</h3>
-  <pre><%=query.getPlan().toString()%></pre>
+  <pre style="white-space:pre-wrap;"><%=query.getPlan().toString()%></pre>
   <hr/>
 </div>
 </body>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
index 744c014..ab6ff26 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querytasks.jsp
@@ -151,16 +151,16 @@
   <h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a></h3>
   <hr/>
   <p/>
-  <pre><%=PlannerUtil.buildExplainString(subQuery.getBlock().getPlan())%></pre>
+  <pre style="white-space:pre-wrap;"><%=PlannerUtil.buildExplainString(subQuery.getBlock().getPlan())%></pre>
   <p/>
   <table border="1" width="100%" class="border_table">
-    <tr><td align='right' width='150px'>Status:</td><td><%=subQuery.getState()%></td></tr>
+    <tr><td align='right' width='180px'>Status:</td><td><%=subQuery.getState()%></td></tr>
     <tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr>
     <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getTaskScheduler().getRackLocalAssigned()%>)</td></tr>
     <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float)(totalProgress/numTasks))%>%</td></tr>
     <tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr>
     <tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr>
-    <tr><td align='right'>Read Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadRows) + " B)"%></td></tr>
+    <tr><td align='right'>Actual Processed Bytes:</td><td><%=totalReadBytes == 0 ? "-" : FileUtil.humanReadableByteCount(totalReadBytes, false) + " (" + nf.format(totalReadBytes) + " B)"%></td></tr>
     <tr><td align='right'>Input Rows:</td><td><%=nf.format(totalReadRows)%></td></tr>
     <tr><td align='right'>Output Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalWriteBytes, false) + " (" + nf.format(totalWriteBytes) + " B)"%></td></tr>
     <tr><td align='right'>Output Rows:</td><td><%=nf.format(totalWriteRows)%></td></tr>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 6c1923c..cf1ccc0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
@@ -74,6 +75,7 @@ public class CSVFile {
     private int BUFFER_SIZE = 128 * 1024;
     private int bufferedBytes = 0;
     private long pos = 0;
+    private boolean isShuffle;
 
     private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
     private SerializerDeserializer serde;
@@ -99,6 +101,15 @@ public class CSVFile {
         throw new FileNotFoundException(path.toString());
       }
 
+      //determine the intermediate file type
+      String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+          TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+      if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+        isShuffle = true;
+      } else {
+        isShuffle = false;
+      }
+
       String codecName = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
       if(!StringUtils.isEmpty(codecName)){
         codecFactory = new CompressionCodecFactory(conf);
@@ -157,7 +168,8 @@ public class CSVFile {
           os.write((byte) delimiter);
           rowBytes += 1;
         }
-        if (enabledStats) {
+        if (isShuffle) {
+          // it is to calculate min/max values, and it is only used for the intermediate file.
           stats.analyzeField(i, datum);
         }
       }
@@ -344,8 +356,7 @@ public class CSVFile {
       }
 
       if (startOffset != 0) {
-        startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset));
-        pos = startOffset;
+        pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
       }
       eof = false;
       page();
@@ -407,7 +418,7 @@ public class CSVFile {
         }
       }
       if (tableStats != null) {
-        tableStats.setReadBytes(getFilePosition() - startOffset);
+        tableStats.setReadBytes(pos - startOffset);
         tableStats.setNumRows(recordCount);
       }
     }
@@ -419,16 +430,12 @@ public class CSVFile {
           return 1.0f;
         }
         long filePos = getFilePosition();
-
-        if (tableStats != null) {
-          tableStats.setReadBytes(filePos - startOffset);
-          tableStats.setNumRows(recordCount);
-        }
-
         if (startOffset == filePos) {
           return 0.0f;
         } else {
-          return Math.min(1.0f, (float)(filePos - startOffset) / (float)(end - startOffset));
+          long readBytes = filePos - startOffset;
+          long remainingBytes = Math.max(end - filePos, 0);
+          return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
         }
       } catch (IOException e) {
         LOG.error(e.getMessage(), e);
@@ -485,7 +492,7 @@ public class CSVFile {
     public void close() throws IOException {
       try {
         if (tableStats != null) {
-          tableStats.setReadBytes(fragment.getEndKey());
+          tableStats.setReadBytes(pos - startOffset);  //Actual Processed Bytes. (decompressed bytes + overhead)
           tableStats.setNumRows(recordCount);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 90d7f48..07f4c8e 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
@@ -454,6 +455,7 @@ public class RCFile {
     CompressionCodec codec;
     Decompressor decompressor = null;
     NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
+    private long readBytes = 0;
 
 
     public ValueBuffer(KeyBuffer currentKey, int columnNumber,
@@ -464,7 +466,6 @@ public class RCFile {
       this.skippedColIDs = skippedIDs;
       this.codec = codec;
       loadedColumnsValueBuffer = new NonSyncByteArrayOutputStream[targets.length];
-
       if (codec != null) {
         decompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
       }
@@ -522,6 +523,7 @@ public class RCFile {
           valBuf.reset();
           valBuf.write(in, vaRowsLen);
         }
+        readBytes += keyBuffer.eachColumnUncompressedValueLen[i];
         addIndex++;
       }
 
@@ -530,8 +532,13 @@ public class RCFile {
       }
     }
 
+    public long getReadBytes() {
+      return readBytes;
+    }
+
     public void clearColumnBuffer() throws IOException {
       decompressBuffer.reset();
+      readBytes = 0;
     }
 
     @Override
@@ -592,7 +599,8 @@ public class RCFile {
     private ColumnBuffer[] columnBuffers = null;
     boolean useNewMagic = true;
     private byte[] nullChars;
-    SerializerDeserializer serde;
+    private SerializerDeserializer serde;
+    private boolean isShuffle;
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -717,6 +725,15 @@ public class RCFile {
         throw new FileNotFoundException(path.toString());
       }
 
+      //determine the intermediate file type
+      String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+          TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+      if (enabledStats && CatalogProtos.StoreType.RCFILE == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+        isShuffle = true;
+      } else {
+        isShuffle = false;
+      }
+
       String codecClassname = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
       if (!StringUtils.isEmpty(codecClassname)) {
         try {
@@ -875,7 +892,8 @@ public class RCFile {
         Datum datum = tuple.get(i);
         int length = columnBuffers[i].append(schema.getColumn(i), datum);
         columnBufferSize += length;
-        if (enabledStats) {
+        if (isShuffle) {
+          // it is to calculate min/max values, and it is only used for the intermediate file.
           stats.analyzeField(i, datum);
         }
       }
@@ -883,7 +901,7 @@ public class RCFile {
       if (size < columnNumber) {
         for (int i = size; i < columnNumber; i++) {
           columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
-          if (enabledStats) {
+          if (isShuffle) {
             stats.analyzeField(i, NullDatum.get());
           }
         }
@@ -1083,6 +1101,10 @@ public class RCFile {
       clearColumnBuffers();
 
       if (out != null) {
+        // Statistical section
+        if (enabledStats) {
+          stats.setNumBytes(getOffset());
+        }
         // Close the underlying stream if we own it...
         out.flush();
         IOUtils.cleanup(LOG, out);
@@ -1139,6 +1161,7 @@ public class RCFile {
 
     private Decompressor keyDecompressor;
 
+    private long readBytes = 0;
 
     //Current state of each selected column - e.g. current run length, etc.
     // The size of the array is equal to the number of selected columns
@@ -1154,19 +1177,19 @@ public class RCFile {
     public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
                          final FileFragment fragment) throws IOException {
       super(conf, schema, meta, fragment);
-
-      rowId = new LongWritable();
       conf.setInt("io.file.buffer.size", 4096); //TODO remove
 
-
       startOffset = fragment.getStartKey();
       endOffset = startOffset + fragment.getEndKey();
-      more = startOffset < endOffset;
       start = 0;
     }
 
     @Override
     public void init() throws IOException {
+      more = startOffset < endOffset;
+      rowId = new LongWritable();
+      readBytes = 0;
+
       String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL));
       if (StringUtils.isEmpty(nullCharacters)) {
         nullChars = NullDatum.get().asTextBytes();
@@ -1353,6 +1376,7 @@ public class RCFile {
 
       in.readFully(sync); // read sync bytes
       headerEnd = in.getPos();
+      readBytes += headerEnd;
     }
 
     /**
@@ -1418,6 +1442,7 @@ public class RCFile {
         while (n > 0 && (in.getPos() + n) <= end) {
           position = in.getPos();
           in.readFully(buffer, prefix, n);
+          readBytes += n;
           /* the buffer has n+sync bytes */
           for (int i = 0; i < n; i++) {
             int j;
@@ -1464,11 +1489,13 @@ public class RCFile {
         return -1;
       }
       int length = in.readInt();
+      readBytes += 4;
       if (sync != null && length == SYNC_ESCAPE) { // process
         // a
         // sync entry
         lastSeenSyncPos = in.getPos() - 4; // minus SYNC_ESCAPE's length
         in.readFully(syncCheck); // read syncCheck
+        readBytes += SYNC_HASH_SIZE;
         if (!Arrays.equals(sync, syncCheck)) {
           throw new IOException("File is corrupt!");
         }
@@ -1477,6 +1504,7 @@ public class RCFile {
           return -1;
         }
         length = in.readInt(); // re-read length
+        readBytes += 4;
       } else {
         syncSeen = false;
       }
@@ -1508,6 +1536,7 @@ public class RCFile {
       }
       currentKeyLength = in.readInt();
       compressedKeyLen = in.readInt();
+      readBytes += 8;
       if (decompress) {
 
         byte[] compressedBytes = new byte[compressedKeyLen];
@@ -1537,7 +1566,7 @@ public class RCFile {
       } else {
         currentKey.readFields(in);
       }
-
+      readBytes += currentKeyLength;
       keyInit = true;
       currentValue.inited = false;
 
@@ -1573,6 +1602,12 @@ public class RCFile {
       currentValue.clearColumnBuffer();
       currentValue.readFields(in);
       currentValue.inited = true;
+      readBytes += currentValue.getReadBytes();
+
+      if (tableStats != null) {
+        tableStats.setReadBytes(readBytes);
+        tableStats.setNumRows(passedRowsNum);
+      }
     }
 
     private boolean rowFetched = false;
@@ -1599,6 +1634,25 @@ public class RCFile {
       return tuple;
     }
 
+    @Override
+    public float getProgress() {
+      try {
+        if(!more) {
+          return 1.0f;
+        }
+        long filePos = getPosition();
+        if (startOffset == filePos) {
+          return 0.0f;
+        } else {
+          //if scanner read the header, filePos moved to zero
+          return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getEndKey()));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return 0.0f;
+      }
+    }
+
     /**
      * Returns how many rows we fetched with nextBuffer(). It only means how many rows
      * are read by nextBuffer(). The returned result may be smaller than actual number
@@ -1729,6 +1783,11 @@ public class RCFile {
 
     @Override
     public void close() throws IOException {
+      if (tableStats != null) {
+        tableStats.setReadBytes(readBytes);  //Actual Processed Bytes. (decompressed bytes + header - seek)
+        tableStats.setNumRows(passedRowsNum);
+      }
+
       IOUtils.cleanup(LOG, in, currentValue);
       if (keyDecompressor != null) {
         // Make sure we only return decompressor once.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index bec1556..a776eb6 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -42,9 +42,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 @RunWith(Parameterized.class)
 public class TestCompressionStorages {
@@ -229,5 +227,7 @@ public class TestCompressionStorages {
     }
     scanner.close();
     assertEquals(tupleNum, tupleCnt);
+    assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bd673e64/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index f4ce46b..3743c4d 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -273,6 +273,7 @@ public class TestStorages {
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.enableStats();
     appender.init();
 
     QueryId queryid = new QueryId("12345", 5);
@@ -299,6 +300,8 @@ public class TestStorages {
     appender.close();
 
     FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
     Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
@@ -310,6 +313,8 @@ public class TestStorages {
       }
     }
     scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
   }
 
   @Test
@@ -337,6 +342,7 @@ public class TestStorages {
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    appender.enableStats();
     appender.init();
 
     QueryId queryid = new QueryId("12345", 5);
@@ -363,6 +369,8 @@ public class TestStorages {
     appender.close();
 
     FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
     Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
@@ -374,6 +382,8 @@ public class TestStorages {
       }
     }
     scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
   }
 
   @Test