You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/05/04 23:38:25 UTC

[4/4] drill git commit: DRILL-4132 Ability to submit simple type of physical plan directly to EndPoint DrillBit for execution. There are multiple changes to achieve this: 1. During physical planning split single plan into multiple based on the number of

DRILL-4132 Ability to submit simple type of physical plan directly to EndPoint DrillBit for execution. There are multiple changes to achieve this: 1. During physical planning split single plan into multiple based on the number of minor fragments of the Leaf Major fragment. a. Removing exchange operators during planning b. Producing just root fragments (that will be also leaf fragments) 2. Each fragment can be executed against Drillbit it is assigned to, so to keep locality Design document can be found in the JIRA: DRILL-4132


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6bba69d4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6bba69d4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6bba69d4

Branch: refs/heads/master
Commit: 6bba69d485b5fafee93a5fd121a0ebc65dd3b5c1
Parents: f07f328
Author: Yuliya Feldman <yf...@maprtech.com>
Authored: Thu Feb 4 14:09:21 2016 -0800
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed May 4 16:35:18 2016 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   |   55 +
 .../planner/fragment/SimpleParallelizer.java    |   43 +-
 .../contrib/ExchangeRemoverMaterializer.java    |   96 +
 .../fragment/contrib/OperatorIdVisitor.java     |   55 +
 .../fragment/contrib/SplittingParallelizer.java |  228 ++
 .../apache/drill/exec/rpc/user/UserClient.java  |   15 +
 .../drill/exec/rpc/user/UserRpcConfig.java      |    4 +
 .../apache/drill/exec/rpc/user/UserServer.java  |    9 +-
 .../exec/util/MemoryAllocationUtilities.java    |   69 +
 .../org/apache/drill/exec/util/Utilities.java   |    2 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  105 +-
 .../drill/exec/work/user/PlanSplitter.java      |  133 +
 .../apache/drill/exec/work/user/UserWorker.java |   27 +-
 .../drill/exec/DrillSeparatePlanningTest.java   |  344 +++
 .../org/apache/drill/exec/proto/ExecProtos.java |  205 +-
 .../drill/exec/proto/SchemaExecProtos.java      |    9 +
 .../drill/exec/proto/SchemaUserProtos.java      |  272 ++
 .../apache/drill/exec/proto/UserBitShared.java  |   60 +-
 .../org/apache/drill/exec/proto/UserProtos.java | 2414 +++++++++++++++++-
 .../drill/exec/proto/beans/FragmentHandle.java  |   24 +
 .../exec/proto/beans/GetQueryPlanFragments.java |  218 ++
 .../exec/proto/beans/QueryPlanFragments.java    |  255 ++
 .../drill/exec/proto/beans/QueryType.java       |    4 +-
 .../apache/drill/exec/proto/beans/RpcType.java  |    4 +
 .../apache/drill/exec/proto/beans/RunQuery.java |   34 +
 .../src/main/protobuf/ExecutionProtos.proto     |    1 +
 protocol/src/main/protobuf/User.proto           |   20 +
 protocol/src/main/protobuf/UserBitShared.proto  |    1 +
 28 files changed, 4543 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index f83285e..11abbcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.client;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
 import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
 import io.netty.buffer.DrillBuf;
@@ -44,6 +45,7 @@ import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
@@ -51,7 +53,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.Property;
+import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
@@ -67,6 +71,9 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -78,6 +85,7 @@ import com.google.common.util.concurrent.SettableFuture;
 public class DrillClient implements Closeable, ConnectionThrottle {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
 
+  private static final ObjectMapper objectMapper = new ObjectMapper();
   private final DrillConfig config;
   private UserClient client;
   private UserProperties props = null;
@@ -321,6 +329,53 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     return listener.getResults();
   }
 
+  /**
+   * API to just plan a query without execution
+   * @param type
+   * @param query
+   * @param isSplitPlan - option to tell whether to return single or split plans for a query
+   * @return list of PlanFragments that can be used later on in {@link #runQuery(QueryType, List, UserResultsListener)}
+   * to run a query without additional planning
+   */
+  public DrillRpcFuture<QueryPlanFragments> planQuery(QueryType type, String query, boolean isSplitPlan) {
+    GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder().setQuery(query).setType(type).setSplitPlan(isSplitPlan).build();
+    return client.planQuery(runQuery);
+  }
+
+  /**
+   * Run query based on list of fragments that were supposedly produced during query planning phase
+   * @param type
+   * @param planFragments
+   * @param resultsListener
+   * @throws RpcException
+   */
+  public void runQuery(QueryType type, List<PlanFragment> planFragments, UserResultsListener resultsListener)
+      throws RpcException {
+    // QueryType can be only executional
+    checkArgument((QueryType.EXECUTION == type), "Only EXECUTION type query is supported with PlanFragments");
+    // setting Plan on RunQuery will be used for logging purposes and therefore can not be null
+    // since there is no Plan string provided we will create a JsonArray out of individual fragment Plans
+    ArrayNode jsonArray = objectMapper.createArrayNode();
+    for (PlanFragment fragment : planFragments) {
+      try {
+        jsonArray.add(objectMapper.readTree(fragment.getFragmentJson()));
+      } catch (IOException e) {
+        logger.error("Exception while trying to read PlanFragment JSON for %s", fragment.getHandle().getQueryId(), e);
+        throw new RpcException(e);
+      }
+    }
+    final String fragmentsToJsonString;
+    try {
+      fragmentsToJsonString = objectMapper.writeValueAsString(jsonArray);
+    } catch (JsonProcessingException e) {
+      logger.error("Exception while trying to get JSONString from Array of individual Fragments Json for %s", e);
+      throw new RpcException(e);
+    }
+    final UserProtos.RunQuery query = newBuilder().setType(type).addAllFragments(planFragments)
+        .setPlan(fragmentsToJsonString)
+        .setResultsMode(STREAM_FULL).build();
+    client.submitQuery(resultsListener, query);
+  }
 
   /*
    * Helper method to generate the UserCredentials message from the properties.

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 9aad9a3..47de88e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -123,7 +123,40 @@ public class SimpleParallelizer implements ParallelizationParameters {
       Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
       UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
 
-    final PlanningSet planningSet = new PlanningSet();
+    final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
+    return generateWorkUnit(
+        options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
+  }
+
+  /**
+   * Create multiple physical plans from original query planning, it will allow execute them eventually independently
+   * @param options
+   * @param foremanNode
+   * @param queryId
+   * @param activeEndpoints
+   * @param reader
+   * @param rootFragment
+   * @param session
+   * @param queryContextInfo
+   * @return
+   * @throws ExecutionSetupException
+   */
+  public List<QueryWorkUnit> getSplitFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+      Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
+      UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+    // no op
+    throw new UnsupportedOperationException("Use children classes");
+  }
+  /**
+   * Helper method to reuse the code for QueryWorkUnit(s) generation
+   * @param activeEndpoints
+   * @param rootFragment
+   * @return
+   * @throws ExecutionSetupException
+   */
+  protected PlanningSet getFragmentsHelper(Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment) throws ExecutionSetupException {
+
+    PlanningSet planningSet = new PlanningSet();
 
     initFragmentWrappers(rootFragment, planningSet);
 
@@ -134,8 +167,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
       parallelizeFragment(wrapper, planningSet, activeEndpoints);
     }
 
-    return generateWorkUnit(
-        options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
+    return planningSet;
   }
 
   // For every fragment, create a Wrapper in PlanningSet.
@@ -221,7 +253,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
         .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
   }
 
-  private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+  protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
       PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
       UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
     List<PlanFragment> fragments = Lists.newArrayList();
@@ -297,10 +329,11 @@ public class SimpleParallelizer implements ParallelizationParameters {
     return new QueryWorkUnit(rootOperator, rootFragment, fragments);
   }
 
+
   /**
    * Designed to setup initial values for arriving fragment accounting.
    */
-  private static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
+  protected static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
     private static final CountRequiredFragments INSTANCE = new CountRequiredFragments();
 
     public static List<Collector> getCollectors(PhysicalOperator root) {

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/ExchangeRemoverMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/ExchangeRemoverMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/ExchangeRemoverMaterializer.java
new file mode 100644
index 0000000..e2cffd8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/ExchangeRemoverMaterializer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment.contrib;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.fragment.Materializer;
+import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Materializer visitor to remove exchange(s)
+ * NOTE: this Visitor does NOT set OperatorId, as after Exchange removal all operators need renumbering
+ * Use OperatorIdVisitor on top to set correct OperatorId
+ */
+public class ExchangeRemoverMaterializer extends AbstractPhysicalVisitor<PhysicalOperator, Materializer.IndexedFragmentNode, ExecutionSetupException> {
+
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeRemoverMaterializer.class);
+
+  public static final ExchangeRemoverMaterializer INSTANCE = new ExchangeRemoverMaterializer();
+
+  private ExchangeRemoverMaterializer() {
+
+  }
+
+  @Override
+  public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException {
+    iNode.addAllocation(exchange);
+    PhysicalOperator childEx = exchange.getChild().accept(this, iNode);
+    return childEx;
+  }
+
+  @Override
+  public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException {
+    PhysicalOperator child = groupScan.getSpecificScan(iNode.getMinorFragmentId());
+    return child;
+  }
+
+  @Override
+  public PhysicalOperator visitSubScan(SubScan subScan, IndexedFragmentNode value) throws ExecutionSetupException {
+    value.addAllocation(subScan);
+    // TODO - implement this
+    return super.visitOp(subScan, value);
+  }
+
+  @Override
+  public PhysicalOperator visitStore(Store store, IndexedFragmentNode iNode) throws ExecutionSetupException {
+    PhysicalOperator child = store.getChild().accept(this, iNode);
+
+    iNode.addAllocation(store);
+
+    try {
+      PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
+      return o;
+    } catch (PhysicalOperatorSetupException e) {
+      throw new FragmentSetupException("Failure while generating a specific Store materialization.", e);
+    }
+  }
+
+  @Override
+  public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) throws ExecutionSetupException {
+    iNode.addAllocation(op);
+    List<PhysicalOperator> children = Lists.newArrayList();
+    for(PhysicalOperator child : op){
+      children.add(child.accept(this, iNode));
+    }
+    PhysicalOperator newOp = op.getNewWithChildren(children);
+    newOp.setCost(op.getCost());
+    return newOp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/OperatorIdVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/OperatorIdVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/OperatorIdVisitor.java
new file mode 100644
index 0000000..0a0f215
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/OperatorIdVisitor.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment.contrib;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
+
+/**
+ * Visitor to renumber operators - needed after materialization is done as some operators may be removed
+ * using @ExtendedMaterializerVisitor
+ *
+ */
+public class OperatorIdVisitor extends AbstractPhysicalVisitor<PhysicalOperator, Integer, ExecutionSetupException> {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorIdVisitor.class);
+
+  public static final OperatorIdVisitor INSTANCE = new OperatorIdVisitor();
+
+  private OperatorIdVisitor() {
+
+  }
+
+  @Override
+  public PhysicalOperator visitSubScan(SubScan subScan, Integer parentOpId) throws ExecutionSetupException {
+    subScan.setOperatorId(Short.MAX_VALUE & parentOpId+1);
+    return subScan;
+  }
+
+  @Override
+  public PhysicalOperator visitOp(PhysicalOperator op, Integer parentOpId) throws ExecutionSetupException {
+    for(PhysicalOperator child : op){
+      child.accept(this, parentOpId+1);
+    }
+    op.setOperatorId(Short.MAX_VALUE & parentOpId+1);
+    return op;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
new file mode 100644
index 0000000..3488e7f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment.contrib;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * SimpleParallelizerMultiPlans class is an extension to SimpleParallelizer
+ * to help with getting PlanFragments for split plan.
+ * Split plan is essentially ability to create multiple Physical Operator plans from original Physical Operator plan
+ * to be able to run plans separately.
+ * Moving functionality specific to splitting the plan to this class
+ * allows not to pollute parent class with non-authentic functionality
+ *
+ */
+public class SplittingParallelizer extends SimpleParallelizer {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class);
+
+  public SplittingParallelizer(QueryContext context) {
+    super(context);
+  }
+
+  /**
+   * Create multiple physical plans from original query planning, it will allow execute them eventually independently
+   * @param options
+   * @param foremanNode
+   * @param queryId
+   * @param activeEndpoints
+   * @param reader
+   * @param rootFragment
+   * @param session
+   * @param queryContextInfo
+   * @return
+   * @throws ExecutionSetupException
+   */
+  public List<QueryWorkUnit> getSplitFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+      Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
+      UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+
+    final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
+
+    return generateWorkUnits(
+        options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
+  }
+
+  /**
+   * Split plan into multiple plans based on parallelization
+   * Ideally it is applicable only to plans with two major fragments: Screen and UnionExchange
+   * But there could be cases where we can remove even multiple exchanges like in case of "order by"
+   * End goal is to get single major fragment: Screen with chain that ends up with a single minor fragment
+   * from Leaf Exchange. This way each plan can run independently without any exchange involvement
+   * @param options
+   * @param foremanNode - not really applicable
+   * @param queryId
+   * @param reader
+   * @param rootNode
+   * @param planningSet
+   * @param session
+   * @param queryContextInfo
+   * @return
+   * @throws ExecutionSetupException
+   */
+  private List<QueryWorkUnit> generateWorkUnits(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+      UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+
+    // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints
+    // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one.
+
+    List<QueryWorkUnit> workUnits = Lists.newArrayList();
+    int plansCount = 0;
+    DrillbitEndpoint[] endPoints = null;
+    long initialAllocation = 0;
+    long maxAllocation = 0;
+
+    final Iterator<Wrapper> iter = planningSet.iterator();
+    while (iter.hasNext()) {
+      Wrapper wrapper = iter.next();
+      Fragment node = wrapper.getNode();
+      boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0;
+      final PhysicalOperator physicalOperatorRoot = node.getRoot();
+      // get all the needed info from leaf fragment
+      if ( (physicalOperatorRoot instanceof Exchange) &&  isLeafFragment) {
+        // need to get info about
+        // number of minor fragments
+        // assignedEndPoints
+        // allocation
+        plansCount = wrapper.getWidth();
+        initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0;
+        maxAllocation = (wrapper.getMaxAllocation() != 0 ) ? wrapper.getMaxAllocation()/plansCount : 0;
+        endPoints = new DrillbitEndpoint[plansCount];
+        for (int mfId = 0; mfId < plansCount; mfId++) {
+          endPoints[mfId] = wrapper.getAssignedEndpoint(mfId);
+        }
+      }
+    }
+    if ( plansCount == 0 ) {
+      // no exchange, return list of single QueryWorkUnit
+      workUnits.add(generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session, queryContextInfo));
+      return workUnits;
+    }
+
+    for (Wrapper wrapper : planningSet) {
+      Fragment node = wrapper.getNode();
+      final PhysicalOperator physicalOperatorRoot = node.getRoot();
+      if ( physicalOperatorRoot instanceof Exchange ) {
+        // get to 0 MajorFragment
+        continue;
+      }
+      boolean isRootNode = rootNode == node;
+
+      if (isRootNode && wrapper.getWidth() != 1) {
+        throw new ForemanSetupException(String.format("Failure while trying to setup fragment. " +
+                "The root fragment must always have parallelization one. In the current case, the width was set to %d.",
+                wrapper.getWidth()));
+      }
+      // this fragment is always leaf, as we are removing all the exchanges
+      boolean isLeafFragment = true;
+
+      FragmentHandle handle = FragmentHandle //
+          .newBuilder() //
+          .setMajorFragmentId(wrapper.getMajorFragmentId()) //
+          .setMinorFragmentId(0) // minor fragment ID is going to be always 0, as plan will be split
+          .setQueryId(queryId) //
+          .build();
+
+      // Create a minorFragment for each major fragment.
+      for (int minorFragmentId = 0; minorFragmentId < plansCount; minorFragmentId++) {
+        // those fragments should be empty
+        List<PlanFragment> fragments = Lists.newArrayList();
+
+        PlanFragment rootFragment = null;
+        FragmentRoot rootOperator = null;
+
+        IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+        wrapper.resetAllocation();
+        // two visitors here
+        // 1. To remove exchange
+        // 2. To reset operator IDs as exchanges were removed
+        PhysicalOperator op = physicalOperatorRoot.accept(ExchangeRemoverMaterializer.INSTANCE, iNode).
+            accept(OperatorIdVisitor.INSTANCE, 0);
+        Preconditions.checkArgument(op instanceof FragmentRoot);
+        FragmentRoot root = (FragmentRoot) op;
+
+        // get plan as JSON
+        String plan;
+        String optionsData;
+        try {
+          plan = reader.writeJson(root);
+          optionsData = reader.writeJson(options);
+        } catch (JsonProcessingException e) {
+          throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
+        }
+
+        PlanFragment fragment = PlanFragment.newBuilder() //
+            .setForeman(endPoints[minorFragmentId]) //
+            .setFragmentJson(plan) //
+            .setHandle(handle) //
+            .setAssignment(endPoints[minorFragmentId]) //
+            .setLeafFragment(isLeafFragment) //
+            .setContext(queryContextInfo)
+            .setMemInitial(initialAllocation)//
+            .setMemMax(wrapper.getMaxAllocation()) // TODO - for some reason OOM is using leaf fragment max allocation divided by width
+            .setOptionsJson(optionsData)
+            .setCredentials(session.getCredentials())
+            .addAllCollector(CountRequiredFragments.getCollectors(root))
+            .build();
+
+        if (isRootNode) {
+          logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
+          rootFragment = fragment;
+          rootOperator = root;
+        } else {
+          logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
+          throw new ForemanSetupException(String.format("There should not be non-root/remote fragment present in plan split, but there is:",
+              DrillStringUtils.unescapeJava(fragment.toString())));
+         }
+        // fragments should be always empty here
+        workUnits.add(new QueryWorkUnit(rootOperator, rootFragment, fragments));
+      }
+    }
+    return workUnits;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5ff6a6d..86abaca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -31,7 +31,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
+import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
@@ -39,6 +41,7 @@ import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.BasicClientWithConnection;
 import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 import org.apache.drill.exec.rpc.Response;
@@ -100,6 +103,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
         return QueryResult.getDefaultInstance();
     case RpcType.QUERY_DATA_VALUE:
       return QueryData.getDefaultInstance();
+    case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
+      return QueryPlanFragments.getDefaultInstance();
     }
     throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
   }
@@ -137,4 +142,14 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
   public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
     return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
   }
+
+  /**
+   * planQuery is an API to plan a query without query execution
+   * @param req - data necessary to plan query
+   * @return list of PlanFragments that can later on be submitted for execution
+   */
+  public DrillRpcFuture<QueryPlanFragments> planQuery(
+      GetQueryPlanFragments req) {
+    return send(RpcType.GET_QUERY_PLAN_FRAGMENTS, req, QueryPlanFragments.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 22d3634..f0cbb22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
@@ -45,6 +47,8 @@ public class UserRpcConfig {
         .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
         .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
         .add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
+        .add(RpcType.GET_QUERY_PLAN_FRAGMENTS, GetQueryPlanFragments.class,
+          RpcType.QUERY_PLAN_FRAGMENTS, QueryPlanFragments.class) // user to bit
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 7e90747..09bc5c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -131,7 +132,13 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       } catch (final InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding QueryId body.", e);
       }
-
+    case RpcType.GET_QUERY_PLAN_FRAGMENTS_VALUE:
+      try {
+        final GetQueryPlanFragments req = GetQueryPlanFragments.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        return new Response(RpcType.QUERY_PLAN_FRAGMENTS, worker.getQueryPlan(connection, req));
+      } catch(final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding GetQueryPlanFragments body.", e);
+      }
     default:
       throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
new file mode 100644
index 0000000..38dfcd0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.server.options.OptionManager;
+
+public class MemoryAllocationUtilities {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class);
+
+  /**
+   * Helper method to setup SortMemoryAllocations
+   * since this method can be used in multiple places adding it in this class
+   * rather than keeping it in Foreman
+   * @param plan
+   * @param queryContext
+   */
+  public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
+    // look for external sorts
+    final List<ExternalSort> sortList = new LinkedList<>();
+    for (final PhysicalOperator op : plan.getSortedOperators()) {
+      if (op instanceof ExternalSort) {
+        sortList.add((ExternalSort) op);
+      }
+    }
+
+    // if there are any sorts, compute the maximum allocation, and set it on them
+    if (sortList.size() > 0) {
+      final OptionManager optionManager = queryContext.getOptions();
+      final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
+      long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
+          queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
+      maxAllocPerNode = Math.min(maxAllocPerNode,
+          optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
+      final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
+      logger.debug("Max sort alloc: {}", maxSortAlloc);
+
+      for(final ExternalSort externalSort : sortList) {
+        externalSort.setMaxAllocation(maxSortAlloc);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 345af31..1ed8909 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
 public class Utilities {
+
   public static String getFileNameForQueryFragment(FragmentContext context, String location, String tag) {
      /*
      * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
@@ -66,5 +67,4 @@ public class Utilities {
       String v = Utilities.class.getPackage().getImplementationVersion();
       return v;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 29c7971..5137cde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.drill.common.CatastrophicFailure;
 import org.apache.drill.common.EventProcessor;
 import org.apache.drill.common.concurrent.ExtendedLatch;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -45,14 +44,12 @@ import org.apache.drill.exec.coord.DistributedSemaphore;
 import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
@@ -77,6 +74,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.util.MemoryAllocationUtilities;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.QueryWorkUnit;
@@ -89,6 +87,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
@@ -250,6 +249,9 @@ public class Foreman implements Runnable {
       case SQL:
         runSQL(queryRequest.getPlan());
         break;
+      case EXECUTION:
+        runFragment(queryRequest.getFragmentsList());
+        break;
       default:
         throw new IllegalStateException();
       }
@@ -394,7 +396,7 @@ public class Foreman implements Runnable {
 
   private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
     validatePlan(plan);
-    setupSortMemoryAllocations(plan);
+    MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
     if (queuingEnabled) {
       acquireQuerySemaphore(plan);
       moveToState(QueryState.STARTING, null);
@@ -419,6 +421,60 @@ public class Foreman implements Runnable {
     logger.debug("Fragments running.");
   }
 
+  /**
+   * This is a helper method to run query based on the list of PlanFragment that were planned
+   * at some point of time
+   * @param fragmentsList
+   * @throws ExecutionSetupException
+   */
+  private void runFragment(List<PlanFragment> fragmentsList) throws ExecutionSetupException {
+    // need to set QueryId, MinorFragment for incoming Fragments
+    PlanFragment rootFragment = null;
+    boolean isFirst = true;
+    final List<PlanFragment> planFragments = Lists.newArrayList();
+    for (PlanFragment myFragment : fragmentsList) {
+      final FragmentHandle handle = myFragment.getHandle();
+      // though we have new field in the FragmentHandle - parentQueryId
+      // it can not be used until every piece of code that creates handle is using it, as otherwise
+      // comparisons on that handle fail that causes fragment runtime failure
+      final FragmentHandle newFragmentHandle = FragmentHandle.newBuilder().setMajorFragmentId(handle.getMajorFragmentId())
+          .setMinorFragmentId(handle.getMinorFragmentId()).setQueryId(queryId)
+          .build();
+      final PlanFragment newFragment = PlanFragment.newBuilder(myFragment).setHandle(newFragmentHandle).build();
+      if (isFirst) {
+        rootFragment = newFragment;
+        isFirst = false;
+      } else {
+        planFragments.add(newFragment);
+      }
+    }
+
+    final FragmentRoot rootOperator;
+    try {
+      rootOperator = drillbitContext.getPlanReader().readFragmentOperator(rootFragment.getFragmentJson());
+    } catch (IOException e) {
+      throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
+    }
+    if (queuingEnabled) {
+      acquireQuerySemaphore(rootOperator.getCost());
+      moveToState(QueryState.STARTING, null);
+    }
+    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
+    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
+
+    logger.debug("Submitting fragments to run.");
+
+    // set up the root fragment first so we'll have incoming buffers available.
+    setupRootFragment(rootFragment, rootOperator);
+
+    setupNonRootFragments(planFragments);
+
+    moveToState(QueryState.RUNNING, null);
+    logger.debug("Fragments running.");
+  }
+
+
+
   private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
     if (plan.getProperties().resultMode != ResultMode.EXEC) {
       throw new ForemanSetupException(String.format(
@@ -427,32 +483,6 @@ public class Foreman implements Runnable {
     }
   }
 
-  private void setupSortMemoryAllocations(final PhysicalPlan plan) {
-    // look for external sorts
-    final List<ExternalSort> sortList = new LinkedList<>();
-    for (final PhysicalOperator op : plan.getSortedOperators()) {
-      if (op instanceof ExternalSort) {
-        sortList.add((ExternalSort) op);
-      }
-    }
-
-    // if there are any sorts, compute the maximum allocation, and set it on them
-    if (sortList.size() > 0) {
-      final OptionManager optionManager = queryContext.getOptions();
-      final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
-      long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
-          queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
-      maxAllocPerNode = Math.min(maxAllocPerNode,
-          optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
-      final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
-      logger.debug("Max sort alloc: {}", maxSortAlloc);
-
-      for(final ExternalSort externalSort : sortList) {
-        externalSort.setMaxAllocation(maxSortAlloc);
-      }
-    }
-  }
-
   /**
    * This limits the number of "small" and "large" queries that a Drill cluster will run
    * simultaneously, if queueing is enabled. If the query is unable to run, this will block
@@ -463,13 +493,19 @@ public class Foreman implements Runnable {
    * @throws ForemanSetupException
    */
   private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
-    final OptionManager optionManager = queryContext.getOptions();
-    final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
     double totalCost = 0;
     for (final PhysicalOperator ops : plan.getSortedOperators()) {
       totalCost += ops.getCost();
     }
 
+    acquireQuerySemaphore(totalCost);
+    return;
+  }
+
+  private void acquireQuerySemaphore(double totalCost) throws ForemanSetupException {
+    final OptionManager optionManager = queryContext.getOptions();
+    final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
+
     final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
     final String queueName;
 
@@ -502,7 +538,6 @@ public class Foreman implements Runnable {
               queueName, queueTimeout / 1000)
           .build(logger);
     }
-
   }
 
   Exception getCurrentException() {
@@ -983,6 +1018,10 @@ public class Foreman implements Runnable {
    * @throws ForemanException
    */
   private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
+    if (fragments.isEmpty()) {
+      // nothing to do here
+      return;
+    }
     /*
      * We will send a single message to each endpoint, regardless of how many fragments will be
      * executed there. We need to start up the intermediate fragments first so that they will be

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
new file mode 100644
index 0000000..eb3e86c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.user;
+
+import java.util.List;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.contrib.SplittingParallelizer;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.MemoryAllocationUtilities;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Helper class to return PlanFragments based on the query plan
+ * or based on split query plan
+ * As of now it is only invoked once per query and therefore cheap to create PlanSplitter object
+ * on heap.
+ */
+public class PlanSplitter {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanSplitter.class);
+
+
+  /**
+   * Method to plan the query and return list of fragments
+   * it will return query plan "as is" or split plans based on the req setting: split_plan
+   * @param dContext
+   * @param queryId
+   * @param req
+   * @param connection
+   * @return
+   */
+  public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId queryId,
+      GetQueryPlanFragments req, UserClientConnection connection) {
+    QueryPlanFragments.Builder responseBuilder = QueryPlanFragments.newBuilder();
+    QueryContext queryContext = new QueryContext(connection.getSession(), dContext, queryId);
+
+    responseBuilder.setQueryId(queryId);
+
+    try {
+      responseBuilder.addAllFragments(getFragments(dContext, req, queryContext, queryId));
+      responseBuilder.setStatus(QueryState.COMPLETED);
+    } catch (Exception e) {
+      final String errorMessage = String.format("Failed to produce PlanFragments for query id \"%s\" with "
+          + "request to %s plan", queryId, (req.getSplitPlan() ? "split" : "no split"));
+      DrillPBError error = DrillPBError.newBuilder().setMessage(errorMessage).setErrorType(DrillPBError.ErrorType.PLAN).build();
+
+      responseBuilder.setStatus(QueryState.FAILED);
+      responseBuilder.setError(error);
+    }
+    return responseBuilder.build();
+  }
+
+  private List<PlanFragment> getFragments(final DrillbitContext dContext, final GetQueryPlanFragments req,
+      final QueryContext queryContext, final QueryId queryId) throws Exception {
+    final PhysicalPlan plan;
+    final String query = req.getQuery();
+    switch(req.getType()) {
+    case SQL:
+      final Pointer<String> textPlan = new Pointer<>();
+      plan = DrillSqlWorker.getPlan(queryContext, query, textPlan);
+      break;
+    case PHYSICAL:
+      plan = dContext.getPlanReader().readPhysicalPlan(query);
+      break;
+    default:
+      throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType");
+    }
+
+    MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
+
+    final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+
+    final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+    final SimpleParallelizer parallelizer = new SplittingParallelizer(queryContext);
+
+    List<PlanFragment> fragments = Lists.newArrayList();
+
+    if ( req.getSplitPlan() ) {
+      final List<QueryWorkUnit> queryWorkUnits = parallelizer.getSplitFragments(
+          queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
+          queryId, queryContext.getActiveEndpoints(), dContext.getPlanReader(), rootFragment,
+          queryContext.getSession(), queryContext.getQueryContextInfo());
+
+      for (QueryWorkUnit queryWorkUnit : queryWorkUnits) {
+        fragments.add(queryWorkUnit.getRootFragment());
+
+        List<PlanFragment> childFragments = queryWorkUnit.getFragments();
+        if (!childFragments.isEmpty()) {
+          throw new IllegalStateException("Split plans can not have more then one fragment");
+        }
+      }
+    } else {
+      final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
+          queryId, queryContext.getActiveEndpoints(), dContext.getPlanReader(), rootFragment,
+          queryContext.getSession(), queryContext.getQueryContextInfo());
+      fragments.add(queryWorkUnit.getRootFragment());
+      fragments.addAll(queryWorkUnit.getFragments());
+    }
+    return fragments;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index e8deb4d..27126d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -21,6 +21,8 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
+import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -46,14 +48,23 @@ public class UserWorker{
     this.bee = bee;
   }
 
-  public QueryId submitWork(UserClientConnection connection, RunQuery query) {
+  /**
+   * Helper method to generate QueryId
+   * @return generated QueryId
+   */
+  private QueryId queryIdGenerator() {
     ThreadLocalRandom r = ThreadLocalRandom.current();
 
     // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence).  Last 12 bytes are random.
-    long time = (int) (System.currentTimeMillis()/1000);
-    long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
-    long p2 = r.nextLong();
-    QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build();
+    final long time = (int) (System.currentTimeMillis()/1000);
+    final long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
+    final long p2 = r.nextLong();
+    final QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build();
+    return id;
+  }
+
+  public QueryId submitWork(UserClientConnection connection, RunQuery query) {
+    final QueryId id = queryIdGenerator();
     incrementer.increment(connection.getSession());
     Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
     bee.addNewForeman(foreman);
@@ -80,4 +91,10 @@ public class UserWorker{
     return bee.getContext().getOptionManager();
   }
 
+  public QueryPlanFragments getQueryPlan(UserClientConnection connection,
+      GetQueryPlanFragments req) {
+    final QueryId queryId = queryIdGenerator();
+    final QueryPlanFragments qPlanFragments = new PlanSplitter().planFragments(bee.getContext(), queryId, req, connection);
+    return qPlanFragments;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
new file mode 100644
index 0000000..0481825
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import static org.junit.Assert.*;
+import io.netty.buffer.DrillBuf;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.DrillAutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.client.PrintingResultsListener;
+import org.apache.drill.exec.client.QuerySubmitter.Format;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.util.VectorUtil;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Class to test different planning use cases (separate form query execution)
+ *
+ */
+public class DrillSeparatePlanningTest extends BaseTestQuery {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSeparatePlanningTest.class);
+
+  static final String WORKING_PATH = TestTools.getWorkingPath();
+  static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+  //final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+  //final String query = "SELECT * FROM cp.`employee.json` where  employee_id > 1 and  employee_id < 1000";
+  //final String query = "SELECT o_orderkey, o_custkey FROM dfs.tmp.`multilevel` where dir0 = 1995 and o_orderkey > 100 and o_orderkey < 1000 limit 5";
+  //final String query = "SELECT sum(o_totalprice) FROM dfs.tmp.`multilevel` where dir0 = 1995 and o_orderkey > 100 and o_orderkey < 1000";
+  //final String query = "SELECT o_orderkey FROM dfs.tmp.`multilevel` order by o_orderkey";
+  //final String query = "SELECT dir1, sum(o_totalprice) FROM dfs.tmp.`multilevel` where dir0 = 1995 group by dir1 order by dir1";
+  //final String query = String.format("SELECT dir0, sum(o_totalprice) FROM dfs_test.`%s/multilevel/json` group by dir0 order by dir0", TEST_RES_PATH);
+
+
+  @Test(timeout=30000)
+  public void testSingleFragmentQuery() throws Exception {
+    final String query = "SELECT * FROM cp.`employee.json` where  employee_id > 1 and  employee_id < 1000";
+
+    QueryPlanFragments planFragments = getFragmentsHelper(query);
+
+    assertNotNull(planFragments);
+
+    assertEquals(1, planFragments.getFragmentsCount());
+    assertTrue(planFragments.getFragments(0).getLeafFragment());
+
+    getResultsHelper(planFragments);
+  }
+
+  @Test(timeout=30000)
+  public void testMultiMinorFragmentSimpleQuery() throws Exception {
+    final String query = String.format("SELECT o_orderkey FROM dfs_test.`%s/multilevel/json`", TEST_RES_PATH);
+
+    QueryPlanFragments planFragments = getFragmentsHelper(query);
+
+    assertNotNull(planFragments);
+
+    assertTrue((planFragments.getFragmentsCount() > 1));
+
+    for ( PlanFragment planFragment : planFragments.getFragmentsList()) {
+      assertTrue(planFragment.getLeafFragment());
+    }
+
+    getResultsHelper(planFragments);
+  }
+
+  @Test(timeout=30000)
+  public void testMultiMinorFragmentComplexQuery() throws Exception {
+    final String query = String.format("SELECT dir0, sum(o_totalprice) FROM dfs_test.`%s/multilevel/json` group by dir0 order by dir0", TEST_RES_PATH);
+
+    QueryPlanFragments planFragments = getFragmentsHelper(query);
+
+    assertNotNull(planFragments);
+
+    assertTrue((planFragments.getFragmentsCount() > 1));
+
+    for ( PlanFragment planFragment : planFragments.getFragmentsList()) {
+      assertTrue(planFragment.getLeafFragment());
+    }
+
+    getResultsHelper(planFragments);
+
+  }
+
+  @Test(timeout=30000)
+  public void testPlanningNoSplit() throws Exception {
+    final String query = String.format("SELECT dir0, sum(o_totalprice) FROM dfs_test.`%s/multilevel/json` group by dir0 order by dir0", TEST_RES_PATH);
+
+    updateTestCluster(2, config);
+
+    List<QueryDataBatch> results = client.runQuery(QueryType.SQL, "alter session set `planner.slice_target`=1");
+    for(QueryDataBatch batch : results) {
+      batch.release();
+    }
+
+    DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(QueryType.SQL, query, false);
+
+    final QueryPlanFragments planFragments = queryFragmentsFutures.get();
+
+    assertNotNull(planFragments);
+
+    assertTrue((planFragments.getFragmentsCount() > 1));
+
+    PlanFragment rootFragment = planFragments.getFragments(0);
+    assertFalse(rootFragment.getLeafFragment());
+
+    getCombinedResultsHelper(planFragments);
+
+  }
+
+  @Test(timeout=30000)
+  public void testPlanningNegative() throws Exception {
+    final String query = String.format("SELECT dir0, sum(o_totalprice) FROM dfs_test.`%s/multilevel/json` group by dir0 order by dir0", TEST_RES_PATH);
+
+    updateTestCluster(2, config);
+    // LOGICAL is not supported
+    DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(QueryType.LOGICAL, query, false);
+
+    final QueryPlanFragments planFragments = queryFragmentsFutures.get();
+
+    assertNotNull(planFragments);
+
+    assertNotNull(planFragments.getError());
+
+    assertTrue(planFragments.getFragmentsCount()==0);
+
+  }
+
+  @Test(timeout=30000)
+  public void testPlanning() throws Exception {
+    final String query = String.format("SELECT dir0, columns[3] FROM dfs_test.`%s/multilevel/csv` order by dir0", TEST_RES_PATH);
+
+    updateTestCluster(2, config);
+
+    List<QueryDataBatch> results = client.runQuery(QueryType.SQL, "alter session set `planner.slice_target`=1");
+    for(QueryDataBatch batch : results) {
+      batch.release();
+    }
+    AwaitableUserResultsListener listener =
+        new AwaitableUserResultsListener(new PrintingResultsListener(client.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH));
+    //AwaitableUserResultsListener listener =
+    //    new AwaitableUserResultsListener(new SilentListener());
+    client.runQuery(QueryType.SQL, query, listener);
+    int rows = listener.await();
+  }
+
+  private QueryPlanFragments getFragmentsHelper(final String query) throws InterruptedException, ExecutionException, RpcException {
+    updateTestCluster(2, config);
+
+    List<QueryDataBatch> results = client.runQuery(QueryType.SQL, "alter session set `planner.slice_target`=1");
+    for(QueryDataBatch batch : results) {
+      batch.release();
+    }
+
+    DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(QueryType.SQL, query, true);
+
+    final QueryPlanFragments planFragments = queryFragmentsFutures.get();
+
+    for (PlanFragment fragment : planFragments.getFragmentsList()) {
+      System.out.println(fragment.getFragmentJson());
+    }
+
+    return planFragments;
+  }
+
+  private void getResultsHelper(final QueryPlanFragments planFragments) throws Exception {
+    for (PlanFragment fragment : planFragments.getFragmentsList()) {
+      DrillbitEndpoint assignedNode = fragment.getAssignment();
+      DrillClient fragmentClient = new DrillClient(true);
+      Properties props = new Properties();
+      props.setProperty("drillbit", assignedNode.getAddress() + ":" + assignedNode.getUserPort());
+      fragmentClient.connect(props);
+
+      ShowResultsUserResultsListener myListener = new ShowResultsUserResultsListener(getAllocator());
+      AwaitableUserResultsListener listenerBits =
+          new AwaitableUserResultsListener(myListener);
+      fragmentClient.runQuery(QueryType.SQL, "select hostname, user_port from sys.drillbits where `current`=true",
+          listenerBits);
+      int row = listenerBits.await();
+      assertEquals(1, row);
+      List<Map<String,String>> records = myListener.getRecords();
+      assertEquals(1, records.size());
+      Map<String,String> record = records.get(0);
+      assertEquals(2, record.size());
+      Iterator<Entry<String, String>> iter = record.entrySet().iterator();
+      Entry<String, String> entry;
+      String host = null;
+      String port = null;
+      for (int i = 0; i < 2; i++) {
+       entry = iter.next();
+       if (entry.getKey().equalsIgnoreCase("hostname")) {
+          host = entry.getValue();
+        } else if (entry.getKey().equalsIgnoreCase("user_port")) {
+          port = entry.getValue();
+        } else {
+          fail("Unknown field: " + entry.getKey());
+        }
+       }
+      assertTrue(props.getProperty("drillbit").equalsIgnoreCase(host+":" + port));
+
+      List<PlanFragment> fragmentList = Lists.newArrayList();
+      fragmentList.add(fragment);
+      //AwaitableUserResultsListener listener =
+     //     new AwaitableUserResultsListener(new PrintingResultsListener(client.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH));
+      AwaitableUserResultsListener listener =
+          new AwaitableUserResultsListener(new SilentListener());
+      fragmentClient.runQuery(QueryType.EXECUTION, fragmentList, listener);
+      int rows = listener.await();
+      fragmentClient.close();
+    }
+  }
+
+  private void getCombinedResultsHelper(final QueryPlanFragments planFragments) throws Exception {
+      ShowResultsUserResultsListener myListener = new ShowResultsUserResultsListener(getAllocator());
+      AwaitableUserResultsListener listenerBits =
+          new AwaitableUserResultsListener(myListener);
+
+      //AwaitableUserResultsListener listener =
+     //     new AwaitableUserResultsListener(new PrintingResultsListener(client.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH));
+      AwaitableUserResultsListener listener =
+          new AwaitableUserResultsListener(new SilentListener());
+      client.runQuery(QueryType.EXECUTION, planFragments.getFragmentsList(), listener);
+      int rows = listener.await();
+  }
+
+  /**
+   * Helper class to get results
+   *
+   */
+  static class ShowResultsUserResultsListener implements UserResultsListener {
+
+    private QueryId queryId;
+    private final RecordBatchLoader loader;
+    private final BufferAllocator allocator;
+    private UserException ex;
+    private List<Map<String,String>> records = Lists.newArrayList();
+
+    public ShowResultsUserResultsListener(BufferAllocator allocator) {
+      this.loader = new RecordBatchLoader(allocator);
+      this.allocator = allocator;
+    }
+
+    public QueryId getQueryId() {
+      return queryId;
+    }
+
+    public List<Map<String, String>> getRecords() {
+      return records;
+    }
+
+    public UserException getEx() {
+      return ex;
+    }
+
+    @Override
+    public void queryIdArrived(QueryId queryId) {
+     this.queryId = queryId;
+    }
+
+    @Override
+    public void submissionFailed(UserException ex) {
+      DrillAutoCloseables.closeNoChecked(allocator);
+      this.ex = ex;
+    }
+
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      QueryData queryHeader = result.getHeader();
+      int rows = queryHeader.getRowCount();
+      try {
+        if ( result.hasData() ) {
+          DrillBuf data = result.getData();
+          loader.load(queryHeader.getDef(), data);
+          for (int i = 0; i < rows; i++) {
+             Map<String,String> record = Maps.newHashMap();
+            for (VectorWrapper<?> vw : loader) {
+              final String field = vw.getValueVector().getMetadata().getNamePart().getName();
+              final ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
+              final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
+              final String display = value == null ? null : value.toString();
+              record.put(field, display);
+            }
+            records.add(record);
+          }
+          loader.clear();
+        }
+        result.release();
+      } catch (SchemaChangeException e) {
+        fail(e.getMessage());
+      }
+
+    }
+
+    @Override
+    public void queryCompleted(QueryState state) {
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/6bba69d4/protocol/src/main/java/org/apache/drill/exec/proto/ExecProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/ExecProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/ExecProtos.java
index 7ca17f1..5b986cf 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/ExecProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/ExecProtos.java
@@ -61,6 +61,20 @@ public final class ExecProtos {
      * <code>optional int32 minor_fragment_id = 3;</code>
      */
     int getMinorFragmentId();
+
+    // optional .exec.shared.QueryId parent_query_id = 4;
+    /**
+     * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+     */
+    boolean hasParentQueryId();
+    /**
+     * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+     */
+    org.apache.drill.exec.proto.UserBitShared.QueryId getParentQueryId();
+    /**
+     * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+     */
+    org.apache.drill.exec.proto.UserBitShared.QueryIdOrBuilder getParentQueryIdOrBuilder();
   }
   /**
    * Protobuf type {@code exec.bit.FragmentHandle}
@@ -136,6 +150,19 @@ public final class ExecProtos {
               minorFragmentId_ = input.readInt32();
               break;
             }
+            case 34: {
+              org.apache.drill.exec.proto.UserBitShared.QueryId.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000008) == 0x00000008)) {
+                subBuilder = parentQueryId_.toBuilder();
+              }
+              parentQueryId_ = input.readMessage(org.apache.drill.exec.proto.UserBitShared.QueryId.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(parentQueryId_);
+                parentQueryId_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000008;
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -230,10 +257,33 @@ public final class ExecProtos {
       return minorFragmentId_;
     }
 
+    // optional .exec.shared.QueryId parent_query_id = 4;
+    public static final int PARENT_QUERY_ID_FIELD_NUMBER = 4;
+    private org.apache.drill.exec.proto.UserBitShared.QueryId parentQueryId_;
+    /**
+     * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+     */
+    public boolean hasParentQueryId() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+     */
+    public org.apache.drill.exec.proto.UserBitShared.QueryId getParentQueryId() {
+      return parentQueryId_;
+    }
+    /**
+     * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+     */
+    public org.apache.drill.exec.proto.UserBitShared.QueryIdOrBuilder getParentQueryIdOrBuilder() {
+      return parentQueryId_;
+    }
+
     private void initFields() {
       queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
       majorFragmentId_ = 0;
       minorFragmentId_ = 0;
+      parentQueryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -256,6 +306,9 @@ public final class ExecProtos {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeInt32(3, minorFragmentId_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(4, parentQueryId_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -277,6 +330,10 @@ public final class ExecProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(3, minorFragmentId_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(4, parentQueryId_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -386,6 +443,7 @@ public final class ExecProtos {
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
           getQueryIdFieldBuilder();
+          getParentQueryIdFieldBuilder();
         }
       }
       private static Builder create() {
@@ -404,6 +462,12 @@ public final class ExecProtos {
         bitField0_ = (bitField0_ & ~0x00000002);
         minorFragmentId_ = 0;
         bitField0_ = (bitField0_ & ~0x00000004);
+        if (parentQueryIdBuilder_ == null) {
+          parentQueryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
+        } else {
+          parentQueryIdBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -448,6 +512,14 @@ public final class ExecProtos {
           to_bitField0_ |= 0x00000004;
         }
         result.minorFragmentId_ = minorFragmentId_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        if (parentQueryIdBuilder_ == null) {
+          result.parentQueryId_ = parentQueryId_;
+        } else {
+          result.parentQueryId_ = parentQueryIdBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -473,6 +545,9 @@ public final class ExecProtos {
         if (other.hasMinorFragmentId()) {
           setMinorFragmentId(other.getMinorFragmentId());
         }
+        if (other.hasParentQueryId()) {
+          mergeParentQueryId(other.getParentQueryId());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -683,6 +758,123 @@ public final class ExecProtos {
         return this;
       }
 
+      // optional .exec.shared.QueryId parent_query_id = 4;
+      private org.apache.drill.exec.proto.UserBitShared.QueryId parentQueryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.drill.exec.proto.UserBitShared.QueryId, org.apache.drill.exec.proto.UserBitShared.QueryId.Builder, org.apache.drill.exec.proto.UserBitShared.QueryIdOrBuilder> parentQueryIdBuilder_;
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public boolean hasParentQueryId() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public org.apache.drill.exec.proto.UserBitShared.QueryId getParentQueryId() {
+        if (parentQueryIdBuilder_ == null) {
+          return parentQueryId_;
+        } else {
+          return parentQueryIdBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public Builder setParentQueryId(org.apache.drill.exec.proto.UserBitShared.QueryId value) {
+        if (parentQueryIdBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          parentQueryId_ = value;
+          onChanged();
+        } else {
+          parentQueryIdBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public Builder setParentQueryId(
+          org.apache.drill.exec.proto.UserBitShared.QueryId.Builder builderForValue) {
+        if (parentQueryIdBuilder_ == null) {
+          parentQueryId_ = builderForValue.build();
+          onChanged();
+        } else {
+          parentQueryIdBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public Builder mergeParentQueryId(org.apache.drill.exec.proto.UserBitShared.QueryId value) {
+        if (parentQueryIdBuilder_ == null) {
+          if (((bitField0_ & 0x00000008) == 0x00000008) &&
+              parentQueryId_ != org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance()) {
+            parentQueryId_ =
+              org.apache.drill.exec.proto.UserBitShared.QueryId.newBuilder(parentQueryId_).mergeFrom(value).buildPartial();
+          } else {
+            parentQueryId_ = value;
+          }
+          onChanged();
+        } else {
+          parentQueryIdBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public Builder clearParentQueryId() {
+        if (parentQueryIdBuilder_ == null) {
+          parentQueryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
+          onChanged();
+        } else {
+          parentQueryIdBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000008);
+        return this;
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public org.apache.drill.exec.proto.UserBitShared.QueryId.Builder getParentQueryIdBuilder() {
+        bitField0_ |= 0x00000008;
+        onChanged();
+        return getParentQueryIdFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      public org.apache.drill.exec.proto.UserBitShared.QueryIdOrBuilder getParentQueryIdOrBuilder() {
+        if (parentQueryIdBuilder_ != null) {
+          return parentQueryIdBuilder_.getMessageOrBuilder();
+        } else {
+          return parentQueryId_;
+        }
+      }
+      /**
+       * <code>optional .exec.shared.QueryId parent_query_id = 4;</code>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.drill.exec.proto.UserBitShared.QueryId, org.apache.drill.exec.proto.UserBitShared.QueryId.Builder, org.apache.drill.exec.proto.UserBitShared.QueryIdOrBuilder> 
+          getParentQueryIdFieldBuilder() {
+        if (parentQueryIdBuilder_ == null) {
+          parentQueryIdBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.drill.exec.proto.UserBitShared.QueryId, org.apache.drill.exec.proto.UserBitShared.QueryId.Builder, org.apache.drill.exec.proto.UserBitShared.QueryIdOrBuilder>(
+                  parentQueryId_,
+                  getParentForChildren(),
+                  isClean());
+          parentQueryId_ = null;
+        }
+        return parentQueryIdBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.FragmentHandle)
     }
 
@@ -709,11 +901,12 @@ public final class ExecProtos {
   static {
     java.lang.String[] descriptorData = {
       "\n\025ExecutionProtos.proto\022\010exec.bit\032\022Coord" +
-      "ination.proto\032\023UserBitShared.proto\"n\n\016Fr" +
-      "agmentHandle\022&\n\010query_id\030\001 \001(\0132\024.exec.sh" +
-      "ared.QueryId\022\031\n\021major_fragment_id\030\002 \001(\005\022" +
-      "\031\n\021minor_fragment_id\030\003 \001(\005B+\n\033org.apache" +
-      ".drill.exec.protoB\nExecProtosH\001"
+      "ination.proto\032\023UserBitShared.proto\"\235\001\n\016F" +
+      "ragmentHandle\022&\n\010query_id\030\001 \001(\0132\024.exec.s" +
+      "hared.QueryId\022\031\n\021major_fragment_id\030\002 \001(\005" +
+      "\022\031\n\021minor_fragment_id\030\003 \001(\005\022-\n\017parent_qu" +
+      "ery_id\030\004 \001(\0132\024.exec.shared.QueryIdB+\n\033or" +
+      "g.apache.drill.exec.protoB\nExecProtosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -725,7 +918,7 @@ public final class ExecProtos {
           internal_static_exec_bit_FragmentHandle_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_FragmentHandle_descriptor,
-              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", });
+              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ParentQueryId", });
           return null;
         }
       };