You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/10 01:43:01 UTC

[2/2] git commit: DRILL-576: Add costing - new plans for joins and aggregations, including distributions. - Utilize GroupScan getSize() for costing - Add cleanup() methods to MergeJoinBatch and HashJoinBatch. - Don't match hash aggr rule if number of gro

DRILL-576: Add costing
- new plans for joins and aggregations, including distributions.
- Utilize GroupScan getSize() for costing
- Add cleanup() methods to MergeJoinBatch and HashJoinBatch.
- Don't match hash aggr rule if number of grouping cols is 0.
- Fix initialization of maxOccupiedIndex in HashAggr and HashTable.
- Fix less-than comparison for cost when row counts are the same.
- Improve fragment identification for better debugging.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3fa1322b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3fa1322b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3fa1322b

Branch: refs/heads/master
Commit: 3fa1322b31605de1e47e70912ad5aadf598c4e99
Parents: b7bf00c
Author: Aman Sinha <as...@mapr.com>
Authored: Fri Apr 4 11:56:44 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri May 9 16:36:18 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ops/QueryContext.java |   1 +
 .../drill/exec/physical/base/Exchange.java      |   9 -
 .../exec/physical/config/BroadcastExchange.java |   5 -
 .../physical/config/HashToMergeExchange.java    |  93 ++++++
 .../physical/config/HashToRandomExchange.java   |   5 -
 .../config/OrderedPartitionExchange.java        |  10 -
 .../physical/config/SingleMergeExchange.java    |   7 +-
 .../exec/physical/config/UnionExchange.java     |   6 -
 .../physical/impl/aggregate/HashAggBatch.java   |   6 +-
 .../impl/aggregate/HashAggTemplate.java         |   9 +-
 .../physical/impl/common/HashTableTemplate.java |   2 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |  11 +-
 .../exec/planner/common/DrillFilterRelBase.java |  23 +-
 .../exec/planner/common/DrillLimitRelBase.java  |  20 ++
 .../planner/common/DrillProjectRelBase.java     |  14 +-
 .../exec/planner/common/DrillScanRelBase.java   |   5 +-
 .../exec/planner/common/DrillScreenRelBase.java |  12 +-
 .../drill/exec/planner/cost/DrillCostBase.java  | 311 +++++++++++++++++++
 .../exec/planner/cost/DrillRelOptCost.java      |  28 ++
 .../planner/cost/DrillRelOptCostFactory.java    |  31 ++
 .../exec/planner/logical/DrillAggregateRel.java |   5 -
 .../planner/logical/DrillAggregateRule.java     |   6 +
 .../exec/planner/logical/DrillRuleSets.java     |  15 +-
 .../exec/planner/logical/DrillScanRel.java      |  43 ++-
 .../exec/planner/physical/AggPruleBase.java     |  57 ++++
 .../planner/physical/BroadcastExchangePrel.java |  83 +++++
 .../physical/DrillDistributionTrait.java        |   5 +-
 .../physical/DrillDistributionTraitDef.java     |  17 +-
 .../drill/exec/planner/physical/FilterPrel.java |   8 +-
 .../exec/planner/physical/HashAggPrel.java      | 127 ++++++++
 .../exec/planner/physical/HashAggPrule.java     |  93 ++++++
 .../exec/planner/physical/HashJoinPrel.java     | 166 ++++++++++
 .../exec/planner/physical/HashJoinPrule.java    |  92 ++++++
 .../physical/HashToMergeExchangePrel.java       | 106 +++++++
 .../physical/HashToRandomExchangePrel.java      |  35 ++-
 .../exec/planner/physical/JoinPruleBase.java    |  66 ++++
 .../exec/planner/physical/MergeJoinPrel.java    |  21 +-
 .../exec/planner/physical/MergeJoinPrule.java   |  75 ++---
 .../physical/OrderedPartitionExchangePrel.java  |  18 +-
 .../exec/planner/physical/PlannerSettings.java  |  18 ++
 .../drill/exec/planner/physical/ScanPrel.java   |  53 +++-
 .../physical/SingleMergeExchangePrel.java       |  37 ++-
 .../drill/exec/planner/physical/SortPrel.java   |  27 +-
 .../exec/planner/physical/StreamAggPrel.java    |  26 +-
 .../exec/planner/physical/StreamAggPrule.java   | 103 +++---
 .../drill/exec/planner/physical/TopNPrel.java   |  23 +-
 .../planner/physical/UnionExchangePrel.java     |  27 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |  14 +
 .../org/apache/drill/exec/work/WorkManager.java |   3 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   7 +
 .../exec/work/fragment/FragmentExecutor.java    |   9 +
 .../org/apache/drill/TestAltSortQueries.java    |   2 +
 .../org/apache/drill/TestTpchDistributed.java   |   1 +
 pom.xml                                         |   2 +-
 54 files changed, 1792 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index e3d2f54..e541200 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -57,6 +57,7 @@ public class QueryContext{
     this.session = session;
     this.timer = new Multitimer<>(QuerySetup.class);
     this.plannerSettings = new PlannerSettings(session.getOptions());
+    this.plannerSettings.setNumEndPoints(this.getActiveEndpoints().size()); 
   }
 
   public PlannerSettings getPlannerSettings(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
index 80cf5a2..2d2372a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -91,13 +91,4 @@ public interface Exchange extends PhysicalOperator {
    */
   public PhysicalOperator getChild();
   
-  
-  
-  /**
-   * Informs the planner whether or not this particular exchange supports an incoming stream that has an attached selection vector. 
-   * @param mode
-   * @return
-   */
-  @JsonIgnore
-  public boolean supportsSelectionVector();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
index 256d3d9..95d8803 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -71,9 +71,4 @@ public class BroadcastExchange extends AbstractExchange {
   public int getMaxSendWidth() {
     return Integer.MAX_VALUE;
   }
-
-  @Override
-  public boolean supportsSelectionVector() {
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
new file mode 100644
index 0000000..8bae26f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("hash-to-merge-exchange")
+public class HashToMergeExchange extends AbstractExchange{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToMergeExchange.class);
+
+  
+  private final LogicalExpression distExpr;
+  private final List<Ordering> orderExprs;
+
+  //ephemeral for setup tasks.
+  private List<DrillbitEndpoint> senderLocations;
+  private List<DrillbitEndpoint> receiverLocations;
+  
+  @JsonCreator
+  public HashToMergeExchange(@JsonProperty("child") PhysicalOperator child, 
+      @JsonProperty("expr") LogicalExpression expr,
+      @JsonProperty("orderings") List<Ordering> orderExprs) {
+    super(child);
+    this.distExpr = expr;
+    this.orderExprs = orderExprs;
+  }
+
+  @Override
+  public int getMaxSendWidth() {
+    return Integer.MAX_VALUE;
+  }
+
+
+  @Override
+  protected void setupSenders(List<DrillbitEndpoint> senderLocations) {
+    this.senderLocations = senderLocations;
+  }
+
+  @Override
+  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) {
+    this.receiverLocations = receiverLocations;
+  }
+
+  @Override
+  public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+    return new HashPartitionSender(receiverMajorFragmentId, child, distExpr, receiverLocations);
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    return new MergingReceiverPOP(senderMajorFragmentId, senderLocations, orderExprs);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new HashToMergeExchange(child, distExpr, orderExprs);
+  }
+
+  @JsonProperty("orderExpr")
+  public List<Ordering> getOrderExpressions(){
+    return orderExprs;
+  }
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
index 61c2f1a..7ad2b65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
@@ -79,11 +79,6 @@ public class HashToRandomExchange extends AbstractExchange{
     return new HashToRandomExchange(child, expr);
   }
 
-  @Override
-  public boolean supportsSelectionVector() {
-    return true;
-  }
-
   @JsonProperty("expr")
   public LogicalExpression getExpression(){
     return expr;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index c49509f..2b80262 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -100,14 +100,4 @@ public class OrderedPartitionExchange extends AbstractExchange {
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
     return new OrderedPartitionExchange(orderings, ref, child, recordsToSample, samplingFactor, completionFactor);
   }
-
-  @Override
-  public boolean supportsSelectionVector() {
-    return true;
-  }
-
-  
-
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index df13a84..26d881d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -84,14 +84,9 @@ public class SingleMergeExchange extends AbstractExchange {
     return new SingleMergeExchange(child, orderExpr);
   }
 
-  @Override
-  public boolean supportsSelectionVector() {
-    return true;
-  }
-
   @JsonProperty("orderings")
   public List<Ordering> getOrderings() {
     return this.orderExpr;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index 78270d8..c295ac3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -75,10 +75,4 @@ public class UnionExchange extends AbstractExchange{
     return new UnionExchange(child);
   }
 
-  @Override
-  public boolean supportsSelectionVector() {
-    return false;
-  }
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index ee5cfa8..aa6cd54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -137,7 +137,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       case CLEANUP_AND_RETURN:
         container.clear();
         aggregator.cleanup();
-        incoming.cleanup();
         done = true;
         return aggregator.getOutcome();
       case RETURN_OUTCOME:
@@ -277,6 +276,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
    
   }
 
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    incoming.cleanup();
+  }
 
   @Override
   protected void killIncoming() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index c50a86a..5f26054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -95,7 +95,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   public class BatchHolder {
 
     private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
-    int maxOccupiedIdx = 0;
+    int maxOccupiedIdx = -1;
 
     private BatchHolder() {
 
@@ -288,6 +288,13 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   private void allocateOutgoing() {
+
+    // At present, since we output all records at once, we create the outgoing batch
+    // with a size of numGroupedRecords..however this has to be restricted to max of 64K right
+    // now otherwise downstream operators will break.
+    // TODO: allow outputting arbitrarily large number of records in batches
+    assert (numGroupedRecords < Character.MAX_VALUE);
+    
     for (VectorAllocator a : keyAllocators) {
       if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords);
       a.alloc(numGroupedRecords);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 23a0cf5..402e395 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -113,7 +113,7 @@ public abstract class HashTableTemplate implements HashTable {
     // Array of hash values - this is useful when resizing the hash table
     private IntVector hashValues;
 
-    int maxOccupiedIdx = 0;
+    int maxOccupiedIdx = -1;
 
     private BatchHolder(int idx) {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index db90085..9a055bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -153,8 +153,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         status.resetOutputPos();
 
       if (outcome == JoinOutcome.NO_MORE_DATA) {
-        left.cleanup();
-        right.cleanup();
         logger.debug("NO MORE DATA; returning {}  NONE");
         return IterOutcome.NONE;
       }
@@ -186,8 +184,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         kill();
         return IterOutcome.STOP;
       case NO_MORE_DATA:
-        left.cleanup();
-        right.cleanup();
         logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
         return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
       case SCHEMA_CHANGED:
@@ -222,6 +218,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     right.kill();
   }
 
+  @Override
+  public void cleanup() {
+      super.cleanup();
+
+      left.cleanup();
+      right.cleanup();
+  }
   
   private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch, 
       JVar incomingLeftRecordBatch, JVar joinStatus, ErrorCollector collector) throws ClassTransformationException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
index 955729b..865c0fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillFilterRelBase.java
@@ -17,21 +17,28 @@
  */
 package org.apache.drill.exec.planner.common;
 
+import java.util.List;
+
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.torel.ConversionContext;
 import org.eigenbase.rel.FilterRelBase;
 import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.Convention;
 import org.eigenbase.relopt.RelOptCluster;
 import org.eigenbase.relopt.RelOptCost;
 import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelOptUtil;
 import org.eigenbase.relopt.RelTraitSet;
 import org.eigenbase.rex.RexNode;
 
@@ -39,14 +46,28 @@ import org.eigenbase.rex.RexNode;
  * Base class for logical and physical Filters implemented in Drill
  */
 public abstract class DrillFilterRelBase extends FilterRelBase implements DrillRelNode {
+  int numConjuncts = 0;
+  
   protected DrillFilterRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
     super(cluster, traits, child, condition);
     assert getConvention() == convention;
+    
+    // save the number of conjuncts that make up the filter condition such 
+    // that repeated calls to the costing function can use the saved copy
+    numConjuncts = RelOptUtil.conjunctions(condition).size();
+    assert numConjuncts >= 1;
   }
   
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    return super.computeSelfCost(planner).multiplyBy(0.1);
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1); 
+    }
+    RelNode child = this.getChild();
+    double inputRows = RelMetadataQuery.getRowCount(child);
+    double cpuCost = DrillCostBase.COMPARE_CPU_COST * numConjuncts * inputRows;
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(inputRows, cpuCost, 0, 0);    
   }
 
   protected LogicalExpression getFilterExpression(DrillParseContext context){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
index b62eb9e..ab6bc02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
@@ -22,14 +22,20 @@ import java.util.List;
 
 import org.apache.drill.common.logical.data.Limit;
 import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.torel.ConversionContext;
 import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.RelWriter;
 import org.eigenbase.rel.SingleRel;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitSet;
 import org.eigenbase.rex.RexLiteral;
 import org.eigenbase.rex.RexNode;
@@ -56,6 +62,20 @@ public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNod
     return this.fetch;
   }
   
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1); 
+    }
+ 
+    int off = offset != null ? RexLiteral.intValue(offset) : 0 ;
+    int f = fetch != null ? RexLiteral.intValue(fetch) : 0 ;
+    double numRows = off + f;
+    double cpuCost = DrillCostBase.COMPARE_CPU_COST * numRows;
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(numRows, cpuCost, 0, 0);    
+  }
+  
   public RelWriter explainTerms(RelWriter pw) {
     super.explainTerms(pw);
     pw.itemIf("offset", offset, offset != null);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
index cf3d188..4629737 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
@@ -22,10 +22,14 @@ import java.util.List;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.eigenbase.rel.ProjectRelBase;
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.Convention;
 import org.eigenbase.relopt.RelOptCluster;
 import org.eigenbase.relopt.RelOptCost;
@@ -50,7 +54,15 @@ public abstract class DrillProjectRelBase extends ProjectRelBase implements Dril
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    return super.computeSelfCost(planner).multiplyBy(0.1);
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1); 
+    }
+    
+    // cost is proportional to the number of rows and number of columns being projected
+    double rowCount = RelMetadataQuery.getRowCount(this);
+    double cpuCost = DrillCostBase.PROJECT_CPU_COST * getRowType().getFieldCount();
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(rowCount, cpuCost, 0, 0);    
   }
 
   private List<Pair<RexNode, String>> projects() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index 88df658..331069d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -25,7 +25,9 @@ import org.eigenbase.relopt.RelOptTable;
 import org.eigenbase.relopt.RelTraitSet;
 
 /**
- * Base class for logical and physical Scans implemented in Drill
+ * Base class for logical scan rel implemented in Drill. 
+ * NOTE: we should eventually make this class independent of TableAccessRelBase and then 
+ * make it the base class for logical and physical scan rels.
  */
 public abstract class DrillScanRelBase extends TableAccessRelBase implements DrillRelNode {
   protected final DrillTable drillTable;
@@ -35,4 +37,5 @@ public abstract class DrillScanRelBase extends TableAccessRelBase implements Dri
     this.drillTable = table.unwrap(DrillTable.class);
     assert drillTable != null;
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
index 51ed442..23b53f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScreenRelBase.java
@@ -17,8 +17,12 @@
  */
 package org.apache.drill.exec.planner.common;
 
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.SingleRel;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.Convention;
 import org.eigenbase.relopt.RelOptCluster;
 import org.eigenbase.relopt.RelOptCost;
@@ -37,7 +41,13 @@ public abstract class DrillScreenRelBase extends SingleRel implements DrillRelNo
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    return super.computeSelfCost(planner).multiplyBy(.1);
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1); 
+    }
+    // by default, assume cost is proportional to number of rows
+    double rowCount = RelMetadataQuery.getRowCount(this);
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(rowCount, rowCount, 0, 0).multiplyBy(0.1);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
new file mode 100644
index 0000000..412c218
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillCostBase.java
@@ -0,0 +1,311 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.planner.cost;
+
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.util.Util;
+
+/**
+ * Implementation of the DrillRelOptCost, modeled similar to VolcanoCost
+ */
+public class DrillCostBase implements DrillRelOptCost {
+
+  /**
+   * NOTE: the multiplication factors below are not calibrated yet...these
+   * are chosen based on approximations for now. For reference purposes, 
+   * assume each disk on a server can have a sustained I/O throughput of 
+   * 100 MBytes/sec.  Suppose there is an array of 16 disks per server..theoretically
+   * one could get 1.6GBytes/sec. Suppose network speed is 1GBit/sec which is 
+   * 128MBytes/sec, although actual transfer rate over the network may be lower.
+   * We are only concerned with relative costs, not absolute values.  
+   * For relative costing, let's assume sending data over the network is
+   * about 16x slower than reading/writing to an array of local disks.
+   */
+  public static final int BASE_CPU_COST = 1;                        // base cpu cost per 'operation'
+  public static final int BYTE_DISK_READ_COST = 32 * BASE_CPU_COST;    // disk read cost per byte
+  public static final int BYTE_NETWORK_COST = 16 * BYTE_DISK_READ_COST; // network transfer cost per byte
+
+
+  public static final int SVR_CPU_COST = 8 * BASE_CPU_COST;          // cpu cost for SV remover
+  public static final int FUNC_CPU_COST = 12 * BASE_CPU_COST;         // cpu cost for a function evaluation
+
+  // cpu cost for projecting an expression; note that projecting an expression
+  // that is not a simple column or constant may include evaluation, but we 
+  // currently don't model it at that level of detail.  
+  public static final int PROJECT_CPU_COST = 4 * BASE_CPU_COST;       
+
+  // hash cpu cost per field (for now we don't distinguish between fields of different types) involves 
+  // the cost of the following operations: 
+  // compute hash value, probe hash table, walk hash chain and compare with each element,
+  // add to the end of hash chain if no match found
+  public static final int HASH_CPU_COST = 8 * BASE_CPU_COST;  
+ 
+  public static final int RANGE_PARTITION_CPU_COST = 12 * BASE_CPU_COST;
+      
+  // cost of comparing one field with another (ignoring data types for now) 
+  public static final int COMPARE_CPU_COST = 4 * BASE_CPU_COST;   
+  public static final int AVG_FIELD_WIDTH = 8;
+  
+  /** For the costing formulas in computeSelfCost(), assume the following notations: 
+  * Let 
+  *   C = Cost per node. 
+  *   k = number of fields on which to distribute on
+  *   h = CPU cost of computing hash value on 1 field 
+  *   s = CPU cost of Selection-Vector remover per row
+  *   w = Network cost of sending 1 row to 1 destination
+  *   c = CPU cost of comparing an incoming row with one on a heap of size N
+  */
+  
+  static final DrillCostBase INFINITY =
+      new DrillCostBase(
+          Double.POSITIVE_INFINITY,
+          Double.POSITIVE_INFINITY,
+          Double.POSITIVE_INFINITY, 
+          Double.POSITIVE_INFINITY) {
+        public String toString() {
+          return "{inf}";
+        }
+      };
+
+  static final DrillCostBase HUGE =
+      new DrillCostBase(Double.MAX_VALUE, 
+        Double.MAX_VALUE, 
+        Double.MAX_VALUE, 
+        Double.MAX_VALUE) {
+        public String toString() {
+          return "{huge}";
+        }
+      };
+
+  static final DrillCostBase ZERO =
+      new DrillCostBase(0.0, 0.0, 0.0, 0.0) {
+        public String toString() {
+          return "{0}";
+        }
+      };
+
+  static final DrillCostBase TINY =
+      new DrillCostBase(1.0, 1.0, 0.0, 0.0) {
+        public String toString() {
+          return "{tiny}";
+        }
+      };
+
+  final double rowCount;
+  final double cpu;
+  final double io;
+  final double network;
+  
+  public DrillCostBase(double rowCount, double cpu, double io, double network) {
+    this.rowCount = rowCount;
+    this.cpu = cpu;
+    this.io = io;
+    this.network = network;
+  }
+
+	@Override
+	public double getRows() {
+		return rowCount;
+	}
+
+	@Override
+	public double getCpu() {
+		return cpu;
+	}
+
+	@Override
+	public double getIo() {
+		return io;
+	}
+
+	@Override
+	public double getNetwork() {
+		return network;
+	}
+
+  @Override
+  public int hashCode() {
+    return Util.hashCode(rowCount) + Util.hashCode(cpu) + Util.hashCode(io) + Util.hashCode(network);
+  }
+
+	@Override
+	public boolean isInfinite() {
+    return (this == INFINITY)
+      || (this.cpu == Double.POSITIVE_INFINITY)
+      || (this.io == Double.POSITIVE_INFINITY)
+      || (this.network == Double.POSITIVE_INFINITY) 
+      || (this.rowCount == Double.POSITIVE_INFINITY);
+	}
+
+	@Override
+	public boolean equals(RelOptCost other) {
+	  // here we compare the individual components similar to VolcanoCost, however
+	  // an alternative would be to add up the components and compare the total.
+	  // Note that VolcanoPlanner mainly uses isLe() and isLt() for cost comparisons, 
+	  // not equals(). 
+    return this == other
+      || (other instanceof DrillCostBase
+          && (this.cpu == ((DrillCostBase) other).cpu)
+          && (this.io == ((DrillCostBase) other).io)
+          && (this.network == ((DrillCostBase) other).network)
+          && (this.rowCount == ((DrillCostBase) other).rowCount));
+	}
+
+	@Override
+	public boolean isEqWithEpsilon(RelOptCost other) {
+    if (!(other instanceof DrillCostBase)) {
+      return false;
+    }
+    DrillCostBase that = (DrillCostBase) other;
+    return (this == that) 
+      || ((Math.abs(this.cpu - that.cpu) < RelOptUtil.EPSILON)
+          && (Math.abs(this.io - that.io) < RelOptUtil.EPSILON)
+          && (Math.abs(this.network - that.network) < RelOptUtil.EPSILON)
+          && (Math.abs(this.rowCount - that.rowCount) < RelOptUtil.EPSILON));
+	}
+	
+  @Override
+  public boolean isLe(RelOptCost other) {
+    DrillCostBase that = (DrillCostBase) other;
+ 
+    return this == that 
+      || ( (this.cpu + this.io + this.network) <=
+           (that.cpu + that.io + that.network) 
+          && this.rowCount <= that.rowCount
+         );
+  }
+
+  @Override
+  public boolean isLt(RelOptCost other) {
+    DrillCostBase that = (DrillCostBase) other;
+
+     return ( (this.cpu + this.io + this.network) < 
+             (that.cpu + that.io + that.network) 
+            && this.rowCount <= that.rowCount
+           );
+  }
+	
+	@Override
+	public RelOptCost plus(RelOptCost other) {
+    DrillCostBase that = (DrillCostBase) other;
+    if ((this == INFINITY) || (that == INFINITY)) {
+      return INFINITY;
+    }
+    return new DrillCostBase(
+        this.rowCount + that.rowCount,
+        this.cpu + that.cpu,
+        this.io + that.io, 
+        this.network + that.network);
+	}
+
+	@Override
+	public RelOptCost minus(RelOptCost other) {
+    if (this == INFINITY) {
+      return this;
+    }
+    DrillCostBase that = (DrillCostBase) other;
+    return new DrillCostBase(
+        this.rowCount - that.rowCount,
+        this.cpu - that.cpu,
+        this.io - that.io, 
+        this.network - that.network);
+	}
+
+	@Override
+	public RelOptCost multiplyBy(double factor) {
+    if (this == INFINITY) {
+      return this;
+    }
+    return new DrillCostBase(rowCount * factor, cpu * factor, io * factor, network * factor);
+	}
+
+	@Override
+	public double divideBy(RelOptCost cost) {
+    // Compute the geometric average of the ratios of all of the factors
+    // which are non-zero and finite.
+    DrillCostBase that = (DrillCostBase) cost;
+    double d = 1;
+    double n = 0;
+    if ((this.rowCount != 0)
+        && !Double.isInfinite(this.rowCount)
+        && (that.rowCount != 0)
+        && !Double.isInfinite(that.rowCount)) {
+      d *= this.rowCount / that.rowCount;
+      ++n;
+    }
+    if ((this.cpu != 0)
+        && !Double.isInfinite(this.cpu)
+        && (that.cpu != 0)
+        && !Double.isInfinite(that.cpu)) {
+      d *= this.cpu / that.cpu;
+      ++n;
+    }
+    if ((this.io != 0)
+        && !Double.isInfinite(this.io)
+        && (that.io != 0)
+        && !Double.isInfinite(that.io)) {
+      d *= this.io / that.io;
+      ++n;
+    }
+    if ((this.network != 0)
+        && !Double.isInfinite(this.network)
+        && (that.network != 0)
+        && !Double.isInfinite(that.network)) {
+      d *= this.network / that.network;
+      ++n;
+    }
+
+    if (n == 0) {
+      return 1.0;
+    }
+    return Math.pow(d, 1 / n);
+	}
+
+  public String toString() {
+    return "{" + rowCount + " rows, " + cpu + " cpu, " + io + " io, " + network + " network}";
+  }
+  
+  public static class DrillCostFactory implements DrillRelOptCostFactory {
+    public RelOptCost makeCost(double dRows, double dCpu, double dIo, double dNetwork) {
+      return new DrillCostBase(dRows, dCpu, dIo, dNetwork);
+    }
+		
+    public RelOptCost makeCost(double dRows, double dCpu, double dIo) {
+      return new DrillCostBase(dRows, dCpu, dIo, 0);
+    }
+		
+    public RelOptCost makeHugeCost() {
+      return DrillCostBase.HUGE;
+    }
+
+    public RelOptCost makeInfiniteCost() {
+      return DrillCostBase.INFINITY;
+    }
+
+    public RelOptCost makeTinyCost() {
+      return DrillCostBase.TINY;
+    }
+
+    public RelOptCost makeZeroCost() {
+      return DrillCostBase.ZERO;
+    }
+  }	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
new file mode 100644
index 0000000..8f20658
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.planner.cost;
+
+import org.eigenbase.relopt.RelOptCost;
+
+public interface DrillRelOptCost extends RelOptCost {
+	
+	double getNetwork();
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java
new file mode 100644
index 0000000..fc20d60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCostFactory.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+
+package org.apache.drill.exec.planner.cost;
+
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptCostFactory;
+
+public interface DrillRelOptCostFactory extends RelOptCostFactory {
+
+  /**
+   * Creates a cost object.
+   */
+  RelOptCost makeCost(double rowCount, double cpu, double io, double network);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
index c2833c1..fe5130c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -49,11 +49,6 @@ public class DrillAggregateRel extends DrillAggregateRelBase implements DrillRel
   public DrillAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
       List<AggregateCall> aggCalls) throws InvalidRelException {
     super(cluster, traits, child, groupSet, aggCalls);
-    for (AggregateCall aggCall : aggCalls) {
-      if (aggCall.isDistinct()) {
-        throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
-      }
-    }
   }
 
   public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
index 273237a..5d65e7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
@@ -41,6 +41,12 @@ public class DrillAggregateRule extends RelOptRule {
   public void onMatch(RelOptRuleCall call) {
     final AggregateRel aggregate = (AggregateRel) call.rel(0);
     final RelNode input = call.rel(1);
+
+    if (aggregate.containsDistinctCall()) {
+      // currently, don't use this rule if any of the aggregates contains DISTINCT
+      return;
+    }
+    
     final RelTraitSet traits = aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
     final RelNode convertedInput = convert(input, traits);
     try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 4defacd..0693d22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -22,6 +22,17 @@ import java.util.Iterator;
 import net.hydromatic.optiq.tools.RuleSet;
 
 import org.apache.drill.exec.planner.physical.*;
+import org.apache.drill.exec.planner.physical.FilterPrule;
+import org.apache.drill.exec.planner.physical.HashAggPrule;
+import org.apache.drill.exec.planner.physical.HashJoinPrule;
+import org.apache.drill.exec.planner.physical.LimitPrule;
+import org.apache.drill.exec.planner.physical.MergeJoinPrule;
+import org.apache.drill.exec.planner.physical.ProjectPrule;
+import org.apache.drill.exec.planner.physical.ScanPrule;
+import org.apache.drill.exec.planner.physical.ScreenPrule;
+import org.apache.drill.exec.planner.physical.SortConvertPrule;
+import org.apache.drill.exec.planner.physical.SortPrule;
+import org.apache.drill.exec.planner.physical.StreamAggPrule;
 import org.eigenbase.rel.RelFactories;
 import org.eigenbase.rel.rules.MergeProjectRule;
 import org.eigenbase.rel.rules.PushFilterPastJoinRule;
@@ -49,7 +60,7 @@ public class DrillRuleSets {
       PushFilterPastJoinRule.FILTER_ON_JOIN,
       PushJoinThroughJoinRule.RIGHT,
       PushJoinThroughJoinRule.LEFT,
-      // End supprot for WHERE style joins.
+      // End support for WHERE style joins.
 
       //Add back rules
 
@@ -106,7 +117,9 @@ public class DrillRuleSets {
       ScreenPrule.INSTANCE,
       ExpandConversionRule.INSTANCE,
       StreamAggPrule.INSTANCE,
+      HashAggPrule.INSTANCE,
       MergeJoinPrule.INSTANCE,
+      HashJoinPrule.INSTANCE,
       FilterPrule.INSTANCE,
       LimitPrule.INSTANCE,
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 46394a9..ae11564 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.torel.ConversionContext;
 import org.eigenbase.relopt.RelOptCluster;
@@ -90,21 +91,39 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
     return this.rowType;
   }
 
-  @Override
-  public RelOptCost computeSelfCost(RelOptPlanner planner) {
-    OperatorCost scanCost = groupScan.getCost();
-    Size scanSize = groupScan.getSize();
-    int columnCount = this.getRowType().getFieldCount();
 
-    // FIXME: Use the new cost model
-    return this
-        .getCluster()
-        .getPlanner()
-        .getCostFactory()
-        .makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(),
-            scanCost.getNetwork() * scanCost.getDisk());
+  @Override
+  public double getRows() {
+    return this.groupScan.getSize().getRecordCount();
   }
 
+  /// TODO: this method is same as the one for ScanPrel...eventually we should consolidate
+  /// this and few other methods in a common base class which would be extended
+  /// by both logical and physical rels.
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    Size scanSize = this.groupScan.getSize();
+    int columnCount = this.getRowType().getFieldCount();   
+    
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      OperatorCost scanCost = this.groupScan.getCost();
+      return planner.getCostFactory().makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(), scanCost.getDisk());
+    }
+    
+    // double rowCount = RelMetadataQuery.getRowCount(this);
+    double rowCount = scanSize.getRecordCount();
+    
+    double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count. 
+    // Even though scan is reading from disk, in the currently generated plans all plans will
+    // need to read the same amount of data, so keeping the disk io cost 0 is ok for now.  
+    // In the future we might consider alternative scans that go against projections or 
+    // different compression schemes etc that affect the amount of data read. Such alternatives
+    // would affect both cpu and io cost. 
+    double ioCost = 0;
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(rowCount, cpuCost, ioCost, 0);   
+  }  
+  
   public GroupScan getGroupScan() {
     return groupScan;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
new file mode 100644
index 0000000..235018d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+
+import net.hydromatic.optiq.util.BitSets;
+
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleOperand;
+
+import com.google.common.collect.Lists;
+
+// abstract base class for the aggregation physical rules  
+public abstract class AggPruleBase extends RelOptRule {
+
+  protected AggPruleBase(RelOptRuleOperand operand, String description) {
+    super(operand, description);   
+  }
+
+  protected List<DistributionField> getDistributionField(DrillAggregateRel rel, boolean allFields) {
+    List<DistributionField> groupByFields = Lists.newArrayList();
+
+    for (int group : BitSets.toIter(rel.getGroupSet())) {
+      DistributionField field = new DistributionField(group);
+      groupByFields.add(field);
+
+      if (!allFields && groupByFields.size() == 1) {
+        // if we are only interested in 1 grouping field, pick the first one for now..
+        // but once we have num distinct values (NDV) statistics, we should pick the one
+        // with highest NDV. 
+        break;
+      }
+    }    
+    
+    return groupByFields;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
new file mode 100644
index 0000000..292c1ba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class BroadcastExchangePrel extends SingleRel implements Prel {
+  
+  public BroadcastExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) {
+    super(cluster, traitSet, input);
+    assert input.getConvention() == Prel.DRILL_PHYSICAL;
+  }
+
+  /**
+   * In a BroadcastExchange, each sender is sending data to N receivers (for costing
+   * purposes we assume it is also sending to itself). 
+   */
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1); 
+    }
+
+    RelNode child = this.getChild();
+   
+    double inputRows = RelMetadataQuery.getRowCount(child);
+    int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
+    double cpuCost = DrillCostBase.SVR_CPU_COST * inputRows ;
+    int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints();
+    double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;
+    return new DrillCostBase(inputRows, cpuCost, 0, networkCost);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new BroadcastExchangePrel(getCluster(), traitSet, sole(inputs));
+  }
+  
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    Prel child = (Prel) this.getChild();
+    
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+    
+    //Currently, only accepts "NONE". For other, requires SelectionVectorRemover
+    if (!childPOP.getSVMode().equals(SelectionVectorMode.NONE)) {
+      childPOP = new SelectionVectorRemover(childPOP);
+    }
+
+    BroadcastExchange g = new BroadcastExchange(childPOP);
+    return g;    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
index b75fb40..abd50d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.planner.physical;
 
-import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTrait;
 import org.eigenbase.relopt.RelTraitDef;
@@ -36,8 +35,8 @@ public class DrillDistributionTrait implements RelTrait {
 
   private DistributionType type;
   private final ImmutableList<DistributionField> fields;
-
-  private DrillDistributionTrait(DistributionType type) {
+  
+  public DrillDistributionTrait(DistributionType type) {
     assert (type == DistributionType.SINGLETON || type == DistributionType.RANDOM_DISTRIBUTED || type == DistributionType.ANY
             || type == DistributionType.ROUND_ROBIN_DISTRIBUTED || type == DistributionType.BROADCAST_DISTRIBUTED);
     this.type = type;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
index c2ebb7a..06c2319 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
@@ -17,9 +17,6 @@
  */
 package org.apache.drill.exec.planner.physical;
 
-import org.eigenbase.rel.RelCollation;
-import org.eigenbase.rel.RelCollationImpl;
-import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitDef;
@@ -47,7 +44,7 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
   public String getSimpleName() {
     return this.getClass().getSimpleName();
   }
-
+  
   // implement RelTraitDef
   public RelNode convert(
       RelOptPlanner planner,
@@ -66,19 +63,21 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
     // We do not want to convert from "ANY", since it's abstract. 
     // Source trait should be concrete type: SINGLETON, HASH_DISTRIBUTED, etc.
     if (currentDist.equals(DrillDistributionTrait.DEFAULT)) {
-      return null;
+        return null;
     }
     
-    RelCollation collation = null;
-    
     switch(toDist.getType()){
-      // UnionExchange, HashToRandomExchange, OrderedPartitionExchange destroy the ordering property, therefore RelCollation is set to default, which is EMPTY.
+      // UnionExchange, HashToRandomExchange, OrderedPartitionExchange and BroadcastExchange destroy the ordering property,
+      // therefore RelCollation is set to default, which is EMPTY.
       case SINGLETON:         
         return new UnionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
       case HASH_DISTRIBUTED:
-        return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel, toDist.getFields());
+        return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel, 
+                                            toDist.getFields());
       case RANGE_DISTRIBUTED:
         return new OrderedPartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
+      case BROADCAST_DISTRIBUTED:
+        return new BroadcastExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 0fc3abd..99a0cdc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -24,9 +24,14 @@ import java.util.List;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitSet;
 import org.eigenbase.rex.RexNode;
 
@@ -40,9 +45,10 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     return new FilterPrel(getCluster(), traitSet, sole(inputs), getCondition());
   }
-
+  
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+
     Prel child = (Prel) this.getChild();
 
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
new file mode 100644
index 0000000..066bed0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+import com.beust.jcommander.internal.Lists;
+
+public class HashAggPrel extends AggregateRelBase implements Prel{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggPrel.class);
+
+  public HashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+      List<AggregateCall> aggCalls) throws InvalidRelException {
+    super(cluster, traits, child, groupSet, aggCalls);
+  }
+
+  public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
+    try {
+      return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+    } catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1); 
+    }
+    RelNode child = this.getChild();
+    double inputRows = RelMetadataQuery.getRowCount(child);
+
+    int numGroupByFields = this.getGroupCount();
+    int numAggrFields = this.aggCalls.size();
+    // cpu cost of hashing each grouping key
+    double cpuCost = DrillCostBase.HASH_CPU_COST * numGroupByFields * inputRows;
+    // add cpu cost for computing the aggregate functions
+    cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows;
+    double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */);    
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    
+    final List<String> childFields = getChild().getRowType().getFieldNames();
+    final List<String> fields = getRowType().getFieldNames();
+    List<NamedExpression> keys = Lists.newArrayList();
+    List<NamedExpression> exprs = Lists.newArrayList();
+    
+    for (int group : BitSets.toIter(groupSet)) {
+      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+      keys.add(new NamedExpression(fr, fr));
+    }
+    
+    for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+      FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
+      LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
+      exprs.add(new NamedExpression(expr, ref));
+    }
+
+    Prel child = (Prel) this.getChild();
+    HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), 
+        keys.toArray(new NamedExpression[keys.size()]), 
+        exprs.toArray(new NamedExpression[exprs.size()]), 
+        1.0f);
+    
+    return g;    
+
+  }
+  
+  private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
+    List<LogicalExpression> args = Lists.newArrayList();
+    for(Integer i : call.getArgList()){
+      args.add(new FieldReference(fn.get(i)));
+    }
+    
+    // for count(1).
+    if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
+    return expr;
+  }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
new file mode 100644
index 0000000..22b33ea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.logging.Logger;
+
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import com.google.common.collect.ImmutableList;
+
+public class HashAggPrule extends AggPruleBase {
+  public static final RelOptRule INSTANCE = new HashAggPrule();
+  protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+  private HashAggPrule() {
+    super(RelOptHelper.some(DrillAggregateRel.class, RelOptHelper.any(DrillRel.class)), "Prel.HashAggPrule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
+    final RelNode input = call.rel(1);
+
+    if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
+      // currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or 
+      // if there are no grouping keys 
+      return;
+    }
+    
+    DrillDistributionTrait toDist = null;
+    RelTraitSet traits = null;
+
+    try {
+      if (aggregate.getGroupSet().isEmpty()) {
+        toDist = DrillDistributionTrait.SINGLETON;
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        createTransformRequest(call, aggregate, input, traits);
+      } else {
+        // hash distribute on all grouping keys
+        toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+                                            ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
+    
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        createTransformRequest(call, aggregate, input, traits);
+
+        // hash distribute on single grouping key
+        toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+                                            ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
+    
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        createTransformRequest(call, aggregate, input, traits);
+        
+        ///TODO: 2 phase hash aggregate plan 
+      } 
+    } catch (InvalidRelException e) {
+      tracer.warning(e.toString());
+    }
+  }
+
+  private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate, 
+                                      RelNode input, RelTraitSet traits) throws InvalidRelException {
+
+    final RelNode convertedInput = convert(input, traits);
+    
+    HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
+                                         aggregate.getAggCallList());
+      
+    call.transformTo(newAgg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3fa1322b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
new file mode 100644
index 0000000..7c68c8d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRelBase;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.util.Pair;
+
+import com.beust.jcommander.internal.Lists;
+
+public class HashJoinPrel  extends DrillJoinRelBase implements Prel {
+
+  public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+      JoinRelType joinType) throws InvalidRelException {
+    super(cluster, traits, left, right, condition, joinType);
+
+    RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
+  }
+
+
+  @Override
+  public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType) {
+    try {
+      return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType);
+    }catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1); 
+    }
+    double probeRowCount = RelMetadataQuery.getRowCount(this.getLeft());
+    double buildRowCount = RelMetadataQuery.getRowCount(this.getRight());
+    
+    // cpu cost of hashing the join keys for the build side
+    double cpuCostBuild = DrillCostBase.HASH_CPU_COST * getRightKeys().size() * buildRowCount;
+    // cpu cost of hashing the join keys for the probe side
+    double cpuCostProbe = DrillCostBase.HASH_CPU_COST * getLeftKeys().size() * probeRowCount;
+      
+    // cpu cost of evaluating each leftkey=rightkey join condition
+    double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * this.getLeftKeys().size();
+    
+    double cpuCost = joinConditionCost * (buildRowCount + probeRowCount) + cpuCostBuild + cpuCostProbe;
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(buildRowCount + probeRowCount, cpuCost, 0, 0);    
+  }
+
+  @Override  
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {    
+    final List<String> fields = getRowType().getFieldNames();
+    assert isUnique(fields);
+    final int leftCount = left.getRowType().getFieldCount();
+    final List<String> leftFields = fields.subList(0, leftCount);
+    final List<String> rightFields = fields.subList(leftCount, fields.size());
+
+    PhysicalOperator leftPop = implementInput(creator, 0, left);
+    PhysicalOperator rightPop = implementInput(creator, leftCount, right);
+
+    //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
+    leftPop = PrelUtil.removeSvIfRequired(leftPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
+
+    //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
+    rightPop = PrelUtil.removeSvIfRequired(rightPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
+
+    JoinRelType jtype = this.getJoinType();
+
+    List<JoinCondition> conditions = Lists.newArrayList();
+
+    for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+      conditions.add(new JoinCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right))));
+    }
+
+    HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype);
+
+    return hjoin;
+  }
+
+  public List<Integer> getLeftKeys() {
+    return this.leftKeys;
+  }
+
+  public List<Integer> getRightKeys() {
+    return this.rightKeys;
+  }
+
+  /**
+   * Check to make sure that the fields of the inputs are the same as the output field names.  If not, insert a project renaming them.
+   * @param implementor
+   * @param i
+   * @param offset
+   * @param input
+   * @return
+   */
+  private PhysicalOperator implementInput(PhysicalPlanCreator creator, int offset, RelNode input) throws IOException {
+    final PhysicalOperator inputOp = ((Prel) input).getPhysicalOperator(creator);
+    assert uniqueFieldNames(input.getRowType());
+    final List<String> fields = getRowType().getFieldNames();
+    final List<String> inputFields = input.getRowType().getFieldNames();
+    final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+    if (!outputFields.equals(inputFields)) {
+      // Ensure that input field names are the same as output field names.
+      // If there are duplicate field names on left and right, fields will get
+      // lost.
+      return rename(creator, inputOp, inputFields, outputFields);
+    } else {
+      return inputOp;
+    }
+  }
+
+  private PhysicalOperator rename(PhysicalPlanCreator creator, PhysicalOperator inputOp, List<String> inputFields, List<String> outputFields) {
+    List<NamedExpression> exprs = Lists.newArrayList();
+
+    //Currently, Project only accepts "NONE". For other, requires SelectionVectorRemover
+    inputOp = PrelUtil.removeSvIfRequired(inputOp, SelectionVectorMode.NONE);
+
+    for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) {
+      exprs.add(new NamedExpression(new FieldReference(pair.left), new FieldReference(pair.right)));
+    }
+
+    Project proj = new Project(exprs, inputOp);
+
+    return proj;
+  }
+
+
+}