You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:30:51 UTC

[06/12] drill git commit: DRILL-2723: Add option to change text format row size estimate.

DRILL-2723: Add option to change text format row size estimate.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5de7d6e9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5de7d6e9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5de7d6e9

Branch: refs/heads/merge_2015_05_09
Commit: 5de7d6e9059f5877b4bd368ab12022e47a74baa6
Parents: fb5e455
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat May 9 11:20:47 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:24:21 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +++
 .../exec/physical/base/AbstractGroupScan.java   | 13 +++++++++-
 .../drill/exec/physical/base/GroupScan.java     |  6 ++---
 .../exec/planner/logical/DrillScanRel.java      | 25 +++++++++++--------
 .../physical/ConvertCountToDirectScan.java      | 25 ++++++++++---------
 .../drill/exec/planner/physical/PrelUtil.java   |  6 ++++-
 .../drill/exec/planner/physical/ScanPrel.java   | 10 +++++---
 .../server/options/SystemOptionManager.java     |  1 +
 .../exec/store/dfs/easy/EasyFormatPlugin.java   | 14 +++++++++++
 .../exec/store/dfs/easy/EasyGroupScan.java      | 26 ++++++++++++--------
 .../exec/store/easy/text/TextFormatPlugin.java  | 19 ++++++++++++--
 11 files changed, 105 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 97d5770..1a10aa2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.RangeLongValidator;
+import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
 import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
 import org.apache.drill.exec.testing.ExecutionControls;
 
@@ -116,6 +117,8 @@ public interface ExecConstants {
   public static String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
   public static BooleanValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(JSON_ALL_TEXT_MODE, false);
   public static final BooleanValidator JSON_EXTENDED_TYPES = new BooleanValidator("store.json.extended_types", true);
+  public static final DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator(
+      "store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE, 100.0);
 
   /**
    * The column label (for directory levels) in results when querying files in a directory

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 242bd5e..5c4ee4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -22,10 +22,11 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Iterators;
-import org.apache.drill.exec.physical.EndpointAffinity;
 
 public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
@@ -70,6 +71,16 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
   }
 
   @Override
+  public ScanStats getScanStats(PlannerSettings settings) {
+    return getScanStats();
+  }
+
+  @JsonIgnore
+  public ScanStats getScanStats() {
+    throw new UnsupportedOperationException("This should be implemented.");
+  }
+
+  @Override
   @JsonIgnore
   public boolean enforceWidth() {
     return false;

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 60b8330..2d16cd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -19,14 +19,14 @@ package org.apache.drill.exec.physical.base;
 
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
 
 /**
  * A GroupScan operator represents all data which will be scanned by a given physical
@@ -68,7 +68,7 @@ public interface GroupScan extends Scan, HasAffinity{
   public abstract String getDigest();
 
   @JsonIgnore
-  public ScanStats getScanStats();
+  public ScanStats getScanStats(PlannerSettings settings);
 
   /**
    * Returns a clone of GroupScan instance, except that the new GroupScan will use the provided list of columns .

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index a724e23..9a009ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.torel.ConversionContext;
 import org.apache.calcite.rel.RelWriter;
@@ -52,19 +53,22 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
   final private RelDataType rowType;
   private GroupScan groupScan;
   private List<SchemaPath> columns;
+  private PlannerSettings settings;
 
   /** Creates a DrillScan. */
-  public DrillScanRel(RelOptCluster cluster, RelTraitSet traits,
-      RelOptTable table) {
+  public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
+      final RelOptTable table) {
     // By default, scan does not support project pushdown.
     // Decision whether push projects into scan will be made solely in DrillPushProjIntoScanRule.
     this(cluster, traits, table, table.getRowType(), GroupScan.ALL_COLUMNS);
+    this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
   }
 
   /** Creates a DrillScan. */
-  public DrillScanRel(RelOptCluster cluster, RelTraitSet traits,
-      RelOptTable table, RelDataType rowType, List<SchemaPath> columns) {
+  public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
+      final RelOptTable table, final RelDataType rowType, final List<SchemaPath> columns) {
     super(DRILL_LOGICAL, cluster, traits, table);
+    this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
     this.rowType = rowType;
     if (columns == null) { // planner asks to scan all of the columns
       this.columns =  ColumnList.all();
@@ -75,18 +79,19 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
     }
     try {
       this.groupScan = drillTable.getGroupScan().clone(this.columns);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException("Failure creating scan.", e);
     }
   }
 
   /** Creates a DrillScanRel for a particular GroupScan */
-  public DrillScanRel(RelOptCluster cluster, RelTraitSet traits,
-      RelOptTable table, GroupScan groupScan, RelDataType rowType, List<SchemaPath> columns) {
+  public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
+      final RelOptTable table, final GroupScan groupScan, final RelDataType rowType, final List<SchemaPath> columns) {
     super(DRILL_LOGICAL, cluster, traits, table);
     this.rowType = rowType;
     this.columns = columns;
     this.groupScan = groupScan;
+    this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
   }
 
 //
@@ -128,15 +133,15 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
 
   @Override
   public double getRows() {
-    return this.groupScan.getScanStats().getRecordCount();
+    return this.groupScan.getScanStats(settings).getRecordCount();
   }
 
   /// TODO: this method is same as the one for ScanPrel...eventually we should consolidate
   /// this and few other methods in a common base class which would be extended
   /// by both logical and physical rels.
   @Override
-  public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    ScanStats stats = groupScan.getScanStats();
+  public RelOptCost computeSelfCost(final RelOptPlanner planner) {
+    final ScanStats stats = groupScan.getScanStats(settings);
     int columnCount = getRowType().getFieldCount();
     double ioCost = 0;
     boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() {

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
index 8b71b70..3b820b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
@@ -21,18 +21,10 @@ package org.apache.drill.exec.planner.physical;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.planner.logical.DrillAggregateRel;
-import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillScanRel;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.store.direct.DirectGroupScan;
-import org.apache.drill.exec.store.pojo.PojoRecordReader;
-import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -41,6 +33,14 @@ import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillProjectRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.store.direct.DirectGroupScan;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
 import com.google.common.collect.Lists;
 
@@ -88,14 +88,15 @@ public class ConvertCountToDirectScan extends Prule {
     final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
     final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;
 
-    GroupScan oldGrpScan = scan.getGroupScan();
+    final GroupScan oldGrpScan = scan.getGroupScan();
+    final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
 
     // Only apply the rule when :
     //    1) scan knows the exact row count in getSize() call,
     //    2) No GroupBY key,
     //    3) only one agg function (Check if it's count(*) below).
     //    4) No distinct agg call.
-    if (! (oldGrpScan.getScanStats().getGroupScanProperty().hasExactRowCount()
+    if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
         && agg.getGroupCount() == 0
         && agg.getAggCallList().size() == 1
         && !agg.containsDistinctCall())) {
@@ -112,7 +113,7 @@ public class ConvertCountToDirectScan extends Prule {
       if (aggCall.getArgList().isEmpty() ||
           (aggCall.getArgList().size() == 1 &&
            ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) {
-        cnt = (long) oldGrpScan.getScanStats().getRecordCount();
+        cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount();
       } else if (aggCall.getArgList().size() == 1) {
       // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
         int index = aggCall.getArgList().get(0);

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index 44d828b..d63bf54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -82,7 +82,11 @@ public class PrelUtil {
     return (Iterator<Prel>) (Object) nodes.iterator();
   }
 
-  public static PlannerSettings getSettings(RelOptCluster cluster) {
+  public static PlannerSettings getSettings(final RelOptCluster cluster) {
+    return getPlannerSettings(cluster);
+  }
+
+  public static PlannerSettings getPlannerSettings(final RelOptCluster cluster) {
     return cluster.getPlanner().getContext().unwrap(PlannerSettings.class);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index da558b2..d48e7cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -101,13 +101,15 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel {
 
   @Override
   public double getRows() {
-    return this.groupScan.getScanStats().getRecordCount();
+    final PlannerSettings settings = PrelUtil.getPlannerSettings(getCluster());
+    return this.groupScan.getScanStats(settings).getRecordCount();
   }
 
   @Override
-  public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    ScanStats stats = this.groupScan.getScanStats();
-    int columnCount = this.getRowType().getFieldCount();
+  public RelOptCost computeSelfCost(final RelOptPlanner planner) {
+    final PlannerSettings settings = PrelUtil.getPlannerSettings(planner);
+    final ScanStats stats = this.groupScan.getScanStats(settings);
+    final int columnCount = this.getRowType().getFieldCount();
 
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
       return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost());

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e9ee68b..e268e64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -81,6 +81,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
+      ExecConstants.TEXT_ESTIMATED_ROW_SIZE,
       ExecConstants.JSON_EXTENDED_TYPES,
       ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
       ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 762760a..d1b46e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -34,9 +34,12 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractRecordReader;
@@ -48,6 +51,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.ImmutableSet;
@@ -200,6 +204,16 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     }
   }
 
+  protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) {
+    long data = 0;
+    for (final CompleteFileWork work : scan.getWorkIterable()) {
+      data += work.getTotalBytes();
+    }
+
+    final long estRowCount = data / 1024;
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
+  }
+
   @Override
   public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException {
     return new EasyWriter(child, location, this);

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 5f9e02b..a559beb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs.easy;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -30,7 +31,7 @@ import org.apache.drill.exec.physical.base.FileGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
@@ -48,6 +49,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 
@@ -102,7 +104,17 @@ public class EasyGroupScan extends AbstractFileGroupScan{
     initFromSelection(selection, formatPlugin);
   }
 
-  private EasyGroupScan(EasyGroupScan that) {
+  @JsonIgnore
+  public Iterable<CompleteFileWork> getWorkIterable() {
+    return new Iterable<CompleteFileWork>() {
+      @Override
+      public Iterator<CompleteFileWork> iterator() {
+        return Iterators.unmodifiableIterator(chunks.iterator());
+      }
+    };
+  }
+
+  private EasyGroupScan(final EasyGroupScan that) {
     super(that.getUserName());
     selection = that.selection;
     formatPlugin = that.formatPlugin;
@@ -134,14 +146,8 @@ public class EasyGroupScan extends AbstractFileGroupScan{
 
 
   @Override
-  public ScanStats getScanStats() {
-    long data =0;
-    for (CompleteFileWork work : chunks) {
-      data += work.getTotalBytes();
-    }
-
-    long estRowCount = data/1024;
-    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
+  public ScanStats getScanStats(final PlannerSettings settings) {
+    return formatPlugin.getScanStats(settings, this);
   }
 
   @JsonProperty("files")

http://git-wip-us.apache.org/repos/asf/drill/blob/5de7d6e9/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 5756a6a..967d920 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -29,6 +29,9 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -43,6 +46,7 @@ import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader;
 import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.hadoop.conf.Configuration;
@@ -95,8 +99,19 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
-    Map<String, String> options = Maps.newHashMap();
+  protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) {
+    long data = 0;
+    for (final CompleteFileWork work : scan.getWorkIterable()) {
+      data += work.getTotalBytes();
+    }
+    final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
+    final double estRowCount = data / estimatedRowSize;
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
+    final Map<String, String> options = Maps.newHashMap();
 
     options.put("location", writer.getLocation());