You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/07 03:20:58 UTC

[4/6] DRILL-381: Implement SYSTEM and SESSION options.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
new file mode 100644
index 0000000..7fb8b6c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.DrillConfigIterator;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Iterables;
+
+public class SystemTablePlugin extends AbstractStoragePlugin{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
+
+  private final DrillbitContext context;
+  private final String name;
+
+  public SystemTablePlugin(SystemTablePluginConfig configuration, DrillbitContext context, String name){
+    this.context = context;
+    this.name = name;
+  }
+
+  private SystemSchema schema = new SystemSchema();
+
+  @Override
+  public StoragePluginConfig getConfig() {
+    return SystemTablePluginConfig.INSTANCE;
+  }
+
+  @Override
+  public void registerSchemas(DrillUser user, SchemaPlus parent) {
+    parent.add(schema.getName(), schema);
+  }
+
+  public Iterator<Object> getRecordIterator(FragmentContext context, SystemTable table){
+    switch(table){
+    case DRILLBITS:
+      return new DrillbitIterator(context);
+    case OPTION:
+
+      return Iterables.concat((Iterable<Object>)(Object) new DrillConfigIterator(context.getConfig()), //
+          context.getOptions()).iterator();
+    default:
+      throw new UnsupportedOperationException("Unable to create record iterator for table: " + table.getTableName());
+    }
+  }
+
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+    SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
+    return new SystemTableScan(table, this);
+  }
+
+  private class SystemSchema extends AbstractSchema{
+
+    private Set<String> tableNames;
+
+    public SystemSchema() {
+      super("sys");
+      Set<String> names = Sets.newHashSet();
+      for(SystemTable t : SystemTable.values()){
+        names.add(t.getTableName());
+      }
+      this.tableNames = ImmutableSet.copyOf(names);
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return tableNames;
+    }
+
+
+    @Override
+    public DrillTable getTable(String name) {
+      for(SystemTable table : SystemTable.values()){
+        if(table.getTableName().equalsIgnoreCase(name)){
+          return new StaticDrillTable(table.getType(), SystemTablePlugin.this.name, SystemTablePlugin.this, table);
+        }
+      }
+      return null;
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
new file mode 100644
index 0000000..bca9881
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+public class SystemTablePluginConfig implements StoragePluginConfig{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
+
+  String name = "system-tables";
+
+  public static SystemTablePluginConfig INSTANCE = new SystemTablePluginConfig();
+
+  private SystemTablePluginConfig(){
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
new file mode 100644
index 0000000..9a745ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("sys")
+public class SystemTableScan extends AbstractGroupScan implements SubScan{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);
+
+  private final SystemTable table;
+  private final SystemTablePlugin plugin;
+
+  @JsonCreator
+  public SystemTableScan( //
+      @JsonProperty("table") SystemTable table, //
+      @JacksonInject StoragePluginRegistry engineRegistry //
+      ) throws IOException, ExecutionSetupException {
+    this.table = table;
+    this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
+  }
+
+  public SystemTableScan(SystemTable table, SystemTablePlugin plugin){
+    this.table = table;
+    this.plugin = plugin;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1,1,1,1);
+  }
+
+  @Override
+  public Size getSize() {
+    return new Size(100,1);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    return this;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+    return this;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public long getInitialAllocation() {
+    return initialAllocation;
+  }
+
+  @Override
+  public long getMaxAllocation() {
+    return maxAllocation;
+  }
+
+  @Override
+  public String getDigest() {
+    return "SystemTableScan: " + table.name();
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
+  public SystemTable getTable() {
+    return table;
+  }
+
+  public SystemTablePlugin getPlugin() {
+    return plugin;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 835adad..e7accba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -49,21 +49,21 @@ import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 
 public class ControlHandlerImpl implements ControlMessageHandler {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-  
+
   private final WorkerBee bee;
-  
+
   public ControlHandlerImpl(WorkerBee bee) {
     super();
     this.bee = bee;
   }
 
-  
+
   @Override
   public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
     if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received bit com message of type {}", rpcType);
 
     switch (rpcType) {
-    
+
     case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
       FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
       cancelFragment(handle);
@@ -80,28 +80,28 @@ public class ControlHandlerImpl implements ControlMessageHandler {
         startNewRemoteFragment(fragment);
         return DataRpcConfig.OK;
 
-      } catch (OutOfMemoryException e) {
-        logger.error("Failure while attempting to start remote fragment.", fragment, e);
+      } catch (ExecutionSetupException e) {
+        logger.error("Failure while attempting to start remote fragment.", fragment);
         return new Response(RpcType.ACK, Acks.FAIL);
       }
-      
+
     default:
       throw new RpcException("Not yet supported.");
     }
 
   }
-  
-  
-  
+
+
+
   /* (non-Javadoc)
    * @see org.apache.drill.exec.work.batch.BitComHandler#startNewRemoteFragment(org.apache.drill.exec.proto.ExecProtos.PlanFragment)
    */
   @Override
-  public void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException{
+  public void startNewRemoteFragment(PlanFragment fragment) throws ExecutionSetupException{
     logger.debug("Received remote fragment start instruction", fragment);
     FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, new FunctionImplementationRegistry(bee.getContext().getConfig()));
     ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman());
-    
+
     NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
     try{
       FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
@@ -117,15 +117,15 @@ public class ControlHandlerImpl implements ControlMessageHandler {
       listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e);
     } catch (OutOfMemoryError t) {
       if(t.getMessage().startsWith("Direct buffer")){
-        listener.fail(fragment.getHandle(), "Failure due to error", t);  
+        listener.fail(fragment.getHandle(), "Failure due to error", t);
       }else{
         throw t;
       }
-      
+
     }
-    
+
   }
-  
+
   /* (non-Javadoc)
    * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
    */
@@ -141,10 +141,10 @@ public class ControlHandlerImpl implements ControlMessageHandler {
       FragmentExecutor runner = bee.getFragmentRunner(handle);
       if(runner != null) runner.cancel();
     }
-    
+
     return Acks.OK;
   }
-  
-  
-  
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 95f2dc6..d00478b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
 
 import io.netty.buffer.ByteBuf;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -26,14 +27,13 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
-import org.apache.drill.exec.work.fragment.FragmentManager;
 
 public interface ControlMessageHandler {
 
   public abstract Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
       throws RpcException;
 
-  public abstract void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException;
+  public abstract void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException, ExecutionSetupException;
 
   public abstract Ack cancelFragment(FragmentHandle handle);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6027c44..e73ddde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -21,8 +21,11 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -31,7 +34,9 @@ import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.DistributedMultiMap;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.QueryContext;
@@ -49,6 +54,7 @@ import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
@@ -186,6 +192,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
+
   private void parseAndRunLogicalPlan(String json) {
 
     try {
@@ -260,6 +267,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
+
   private void runPhysicalPlan(PhysicalPlan plan) {
 
     if(plan.getProperties().resultMode != ResultMode.EXEC){
@@ -280,7 +288,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     SimpleParallelizer parallelizer = new SimpleParallelizer();
 
     try {
-      QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
+      QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
               context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH),
               context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT));
 
@@ -329,7 +337,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
     if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
-    return new BasicOptimizer(DrillConfig.create(), context).optimize(new BasicOptimizer.BasicOptimizationContext(), plan);
+    return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(new BasicOptimizer.BasicOptimizationContext(context), plan);
   }
 
   public QueryResult getResult(UserClientConnection connection, RequestResults req) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index c8f2021..5cad658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -60,8 +60,8 @@ public class NonRootFragmentManager implements FragmentManager {
       this.context.setBuffers(buffers);
       this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman()));
       this.reader = context.getPlanReader();
-      
-    }catch(IOException e){
+
+    }catch(ExecutionSetupException | IOException e){
       throw new FragmentSetupException("Failure while decoding fragment.", e);
     }
   }
@@ -92,7 +92,7 @@ public class NonRootFragmentManager implements FragmentManager {
         return null;
       }
     }
-    
+
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 2b8779a..4f5e2e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
@@ -74,4 +75,8 @@ public class UserWorker{
   public SchemaFactory getSchemaFactory(){
     return bee.getContext().getSchemaFactory();
   }
+
+  public OptionManager getSystemOptions(){
+    return bee.getContext().getOptionManager();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 948c763..6770ee7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -39,6 +39,9 @@ import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.junit.Rule;
 import org.junit.rules.TestRule;
@@ -51,7 +54,7 @@ import com.google.common.io.Resources;
 public class PlanningBase extends ExecTest{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningBase.class);
 
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(30000);
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(10000);
 
   @Mocked DrillbitContext dbContext;
   @Mocked QueryContext context;
@@ -68,6 +71,10 @@ public class PlanningBase extends ExecTest{
     final DistributedCache cache = new LocalCache();
     cache.run();
 
+    final SystemOptionManager opt = new SystemOptionManager(cache);
+    opt.init();
+    final OptionManager sess = new SessionOptionManager(opt);
+
     new NonStrictExpectations() {
       {
         dbContext.getMetrics();
@@ -76,6 +83,8 @@ public class PlanningBase extends ExecTest{
         result = new TopLevelAllocator();
         dbContext.getConfig();
         result = config;
+        dbContext.getOptionManager();
+        result = opt;
         dbContext.getCache();
         result = cache;
       }
@@ -88,6 +97,7 @@ public class PlanningBase extends ExecTest{
     registry.getSchemaFactory().registerSchemas(null, root);
 
 
+
     new NonStrictExpectations() {
       {
         context.getNewDefaultSchema();
@@ -97,13 +107,15 @@ public class PlanningBase extends ExecTest{
         context.getFunctionRegistry();
         result = functionRegistry;
         context.getSession();
-        result = new UserSession(null, null);
+        result = new UserSession(null, null, null);
         context.getCurrentEndpoint();
         result = DrillbitEndpoint.getDefaultInstance();
         context.getActiveEndpoints();
         result = ImmutableList.of(DrillbitEndpoint.getDefaultInstance());
         context.getPlannerSettings();
-        result = new PlannerSettings();
+        result = new PlannerSettings(sess);
+        context.getOptions();
+        result = sess;
         context.getConfig();
         result = config;
         context.getCache();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
index 1ccb65c..a459bef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
@@ -20,10 +20,11 @@ package org.apache.drill;
 import org.junit.Ignore;
 import org.junit.Test;
 
+@Ignore // DRILL-648
 public class TestTpchSingleMode extends BaseTestQuery{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchSingleMode.class);
 
-  private static final String SINGLE_MODE = "ALTER SESSION SET NO_EXCHANGES = true;";
+  private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges` = true;";
 
 
   private void testSingleMode(String fileName) throws Exception{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 4b2378d..41b00da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -109,11 +109,11 @@ public class TestOptiqPlans extends ExecTest {
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
     DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus);
-    QueryContext qc = new QueryContext(new UserSession(null, null), QueryId.getDefaultInstance(), bitContext);
+    QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
-    PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc).optimize(
-        new BasicOptimizer.BasicOptimizationContext(), plan);
+    PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
+
 
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
     FragmentContext fctxt = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 37e350e..1b38dce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.junit.Test;
 
@@ -34,17 +35,17 @@ import com.google.common.collect.Lists;
 
 public class TestFragmentChecker extends PopUnitTestBase{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFragmentChecker.class);
-  
-  
+
+
   @Test
   public void checkSimpleExchangePlan() throws Exception{
     print("/physical_double_exchange.json", 2, 3);
 
   }
-  
-  
+
+
   private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws Exception{
-    
+
     System.out.println(String.format("=================Building plan fragments for [%s].  Allowing %d total Drillbits.==================", fragmentFile, bitCount));
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
     Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
@@ -54,27 +55,27 @@ public class TestFragmentChecker extends PopUnitTestBase{
     DrillbitEndpoint localBit = null;
     for(int i =0; i < bitCount; i++){
       DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setControlPort(1234+i).build();
-      if(i ==0) localBit = b1; 
+      if(i ==0) localBit = b1;
       endpoints.add(b1);
     }
-    
-    
-    QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
+
+
+    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
-    
+
     System.out.print(qwu.getRootFragment().getFragmentJson());
-    
-    
+
+
     for(PlanFragment f : qwu.getFragments()){
       System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(), f.getHandle().getMinorFragmentId()));
       System.out.print(f.getFragmentJson());
     }
-    
+
     //assertEquals(exepectedFragmentCount, qwu.getFragments().size());
 
     logger.debug("Planning Set {}", planningSet);
   }
-  
+
   @Test
   public void validateSingleExchangeFragment() throws Exception{
     print("/physical_single_exchange.json", 1, 2);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
new file mode 100644
index 0000000..c522da8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestOptions extends BaseTestQuery{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOptions.class);
+
+  @Test
+  public void testDrillbits() throws Exception{
+    test("select * from sys.drillbits;");
+  }
+
+  @Test
+  public void testOptions() throws Exception{
+    test(
+        "select * from sys.options;" +
+        "ALTER SYSTEM set `planner.disable_exchanges` = true;" +
+        "select * from sys.options;" +
+        "ALTER SESSION set `planner.disable_exchanges` = true;" +
+        "select * from sys.options;"
+        );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/resources/server/options_session_check.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/server/options_session_check.json b/exec/java-exec/src/test/resources/server/options_session_check.json
new file mode 100644
index 0000000..6cb80fd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/server/options_session_check.json
@@ -0,0 +1,20 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"options-reader-group-scan"
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "screen"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/resources/server/options_set.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/server/options_set.json b/exec/java-exec/src/test/resources/server/options_set.json
new file mode 100644
index 0000000..dda35fc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/server/options_set.json
@@ -0,0 +1,24 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        },
+        options : {
+            &REPLACED_IN_TEST&
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"options-reader-group-scan"
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "screen"
+        }
+    ]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index f70ddca..8130a33 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -272,6 +272,8 @@ public class TestJdbcQuery extends JdbcTest{
         "TABLE_SCHEMA=hive.default; TABLE_NAME=kv\n" +
         "TABLE_SCHEMA=hive.db1; TABLE_NAME=kv_db1\n" +
         "TABLE_SCHEMA=hive; TABLE_NAME=kv\n" +
+        "TABLE_SCHEMA=sys; TABLE_NAME=drillbits\n" +
+        "TABLE_SCHEMA=sys; TABLE_NAME=options\n" +
         "TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=VIEWS\n" +
         "TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=COLUMNS\n" +
         "TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=TABLES\n" +
@@ -322,6 +324,7 @@ public class TestJdbcQuery extends JdbcTest{
         "SCHEMA_NAME=dfs\n" +
         "SCHEMA_NAME=cp.default\n" +
         "SCHEMA_NAME=cp\n" +
+        "SCHEMA_NAME=sys\n" +
         "SCHEMA_NAME=INFORMATION_SCHEMA\n";
 
     JdbcAssert.withNoDefaultSchema().sql("SHOW DATABASES").returns(expected);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index fe37521..a1cf6f8 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -3155,6 +3155,21 @@ public final class BitControl {
      * <code>optional int32 time_zone = 16;</code>
      */
     int getTimeZone();
+
+    // optional string options_json = 17;
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    boolean hasOptionsJson();
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    java.lang.String getOptionsJson();
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    com.google.protobuf.ByteString
+        getOptionsJsonBytes();
   }
   /**
    * Protobuf type {@code exec.bit.control.PlanFragment}
@@ -3309,6 +3324,11 @@ public final class BitControl {
               timeZone_ = input.readInt32();
               break;
             }
+            case 138: {
+              bitField0_ |= 0x00004000;
+              optionsJson_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3648,6 +3668,49 @@ public final class BitControl {
       return timeZone_;
     }
 
+    // optional string options_json = 17;
+    public static final int OPTIONS_JSON_FIELD_NUMBER = 17;
+    private java.lang.Object optionsJson_;
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    public boolean hasOptionsJson() {
+      return ((bitField0_ & 0x00004000) == 0x00004000);
+    }
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    public java.lang.String getOptionsJson() {
+      java.lang.Object ref = optionsJson_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          optionsJson_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    public com.google.protobuf.ByteString
+        getOptionsJsonBytes() {
+      java.lang.Object ref = optionsJson_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        optionsJson_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       handle_ = org.apache.drill.exec.proto.ExecProtos.FragmentHandle.getDefaultInstance();
       networkCost_ = 0F;
@@ -3663,6 +3726,7 @@ public final class BitControl {
       queryStartTime_ = 0L;
       credentials_ = org.apache.drill.exec.proto.UserBitShared.UserCredentials.getDefaultInstance();
       timeZone_ = 0;
+      optionsJson_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3718,6 +3782,9 @@ public final class BitControl {
       if (((bitField0_ & 0x00002000) == 0x00002000)) {
         output.writeInt32(16, timeZone_);
       }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        output.writeBytes(17, getOptionsJsonBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -3783,6 +3850,10 @@ public final class BitControl {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(16, timeZone_);
       }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(17, getOptionsJsonBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3947,6 +4018,8 @@ public final class BitControl {
         bitField0_ = (bitField0_ & ~0x00001000);
         timeZone_ = 0;
         bitField0_ = (bitField0_ & ~0x00002000);
+        optionsJson_ = "";
+        bitField0_ = (bitField0_ & ~0x00004000);
         return this;
       }
 
@@ -4047,6 +4120,10 @@ public final class BitControl {
           to_bitField0_ |= 0x00002000;
         }
         result.timeZone_ = timeZone_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00004000;
+        }
+        result.optionsJson_ = optionsJson_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4107,6 +4184,11 @@ public final class BitControl {
         if (other.hasTimeZone()) {
           setTimeZone(other.getTimeZone());
         }
+        if (other.hasOptionsJson()) {
+          bitField0_ |= 0x00004000;
+          optionsJson_ = other.optionsJson_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5021,6 +5103,80 @@ public final class BitControl {
         return this;
       }
 
+      // optional string options_json = 17;
+      private java.lang.Object optionsJson_ = "";
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public boolean hasOptionsJson() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public java.lang.String getOptionsJson() {
+        java.lang.Object ref = optionsJson_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          optionsJson_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public com.google.protobuf.ByteString
+          getOptionsJsonBytes() {
+        java.lang.Object ref = optionsJson_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          optionsJson_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public Builder setOptionsJson(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00004000;
+        optionsJson_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public Builder clearOptionsJson() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        optionsJson_ = getDefaultInstance().getOptionsJson();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public Builder setOptionsJsonBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00004000;
+        optionsJson_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.control.PlanFragment)
     }
 
@@ -5753,7 +5909,7 @@ public final class BitControl {
       "shared.DrillPBError\022\024\n\014running_time\030\t \001(" +
       "\003\"k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
       "ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
-      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\"\250\003\n\014PlanF" +
+      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\"\276\003\n\014PlanF" +
       "ragment\022(\n\006handle\030\001 \001(\0132\030.exec.bit.Fragm",
       "entHandle\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_c" +
       "ost\030\005 \001(\002\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_c" +
@@ -5764,16 +5920,17 @@ public final class BitControl {
       "\003:\01020000000\022\034\n\007mem_max\030\r \001(\003:\0132000000000" +
       "0\022\030\n\020query_start_time\030\016 \001(\003\0221\n\013credentia" +
       "ls\030\017 \001(\0132\034.exec.shared.UserCredentials\022\021" +
-      "\n\ttime_zone\030\020 \001(\005\"f\n\017WorkQueueStatus\022(\n\010",
-      "endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024" +
-      "\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(" +
-      "\003*\332\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n" +
-      "\007GOODBYE\020\002\022\033\n\027REQ_INIATILIZE_FRAGMENT\020\003\022" +
-      "\027\n\023REQ_CANCEL_FRAGMENT\020\006\022\027\n\023REQ_FRAGMENT" +
-      "_STATUS\020\007\022\022\n\016REQ_BIT_STATUS\020\010\022\030\n\024RESP_FR" +
-      "AGMENT_HANDLE\020\t\022\030\n\024RESP_FRAGMENT_STATUS\020" +
-      "\n\022\023\n\017RESP_BIT_STATUS\020\013B+\n\033org.apache.dri" +
-      "ll.exec.protoB\nBitControlH\001"
+      "\n\ttime_zone\030\020 \001(\005\022\024\n\014options_json\030\021 \001(\t\"",
+      "f\n\017WorkQueueStatus\022(\n\010endpoint\030\001 \001(\0132\026.e" +
+      "xec.DrillbitEndpoint\022\024\n\014queue_length\030\002 \001" +
+      "(\005\022\023\n\013report_time\030\003 \001(\003*\332\001\n\007RpcType\022\r\n\tH" +
+      "ANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\033\n\027REQ_" +
+      "INIATILIZE_FRAGMENT\020\003\022\027\n\023REQ_CANCEL_FRAG" +
+      "MENT\020\006\022\027\n\023REQ_FRAGMENT_STATUS\020\007\022\022\n\016REQ_B" +
+      "IT_STATUS\020\010\022\030\n\024RESP_FRAGMENT_HANDLE\020\t\022\030\n" +
+      "\024RESP_FRAGMENT_STATUS\020\n\022\023\n\017RESP_BIT_STAT" +
+      "US\020\013B+\n\033org.apache.drill.exec.protoB\nBit" +
+      "ControlH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5803,7 +5960,7 @@ public final class BitControl {
           internal_static_exec_bit_control_PlanFragment_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_PlanFragment_descriptor,
-              new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "Assignment", "LeafFragment", "Foreman", "MemInitial", "MemMax", "QueryStartTime", "Credentials", "TimeZone", });
+              new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "Assignment", "LeafFragment", "Foreman", "MemInitial", "MemMax", "QueryStartTime", "Credentials", "TimeZone", "OptionsJson", });
           internal_static_exec_bit_control_WorkQueueStatus_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_exec_bit_control_WorkQueueStatus_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index a738646..77d7e9d 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -77,6 +77,7 @@ message PlanFragment {
   optional int64 query_start_time = 14; // start time of query in milliseconds
   optional exec.shared.UserCredentials credentials = 15;
   optional int32 time_zone = 16;
+  optional string options_json = 17;
 }
 
 message WorkQueueStatus{