You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/07 03:20:55 UTC

[1/6] git commit: DRILL-643: Fix ByteBuf allocation in o.a.d.exec.vector.ValueHolderHelper.getVarCharHolder()

Repository: incubator-drill
Updated Branches:
  refs/heads/master e790e7962 -> 08923cb82


DRILL-643: Fix ByteBuf allocation in o.a.d.exec.vector.ValueHolderHelper.getVarCharHolder()


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3af2344f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3af2344f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3af2344f

Branch: refs/heads/master
Commit: 3af2344f27749d63890a379c5b9ed4d92b142a73
Parents: e790e79
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue May 6 01:17:18 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 15:39:45 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/vector/ValueHolderHelper.java     | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3af2344f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
index e4af851..fb9dfd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
@@ -17,35 +17,34 @@
  */
 package org.apache.drill.exec.vector;
 
-import java.nio.ByteOrder;
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.SwappedByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 
+import java.math.BigDecimal;
+import java.nio.ByteOrder;
+
 import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.expr.holders.IntervalDayHolder;
-import org.apache.drill.exec.expr.holders.Decimal9Holder;
 import org.apache.drill.exec.expr.holders.Decimal18Holder;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.expr.holders.IntervalDayHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
 
 import com.google.common.base.Charsets;
 
-import java.math.BigDecimal;
-
 
 public class ValueHolderHelper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueHolderHelper.class);
-  
+
   public static VarCharHolder getVarCharHolder(String s){
     VarCharHolder vch = new VarCharHolder();
-    
+
     byte[] b = s.getBytes(Charsets.UTF_8);
     vch.start = 0;
     vch.end = b.length;
-    vch.buffer = UnpooledByteBufAllocator.DEFAULT.buffer(s.length()).order(ByteOrder.LITTLE_ENDIAN); // use the length of input string to allocate buffer. 
+    vch.buffer = UnpooledByteBufAllocator.DEFAULT.buffer(b.length).order(ByteOrder.LITTLE_ENDIAN); // use the length of input string to allocate buffer. 
     vch.buffer.setBytes(0, b);
     return vch;
   }


[2/6] git commit: DRILL-644: Fix char length and position calculation for UTF-8 strings in StringFunctionUtil

Posted by ja...@apache.org.
DRILL-644: Fix char length and position calculation for UTF-8 strings in StringFunctionUtil


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0f65fd8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0f65fd8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0f65fd8d

Branch: refs/heads/master
Commit: 0f65fd8d3bb40300e5455f31603d41dd9a149bb6
Parents: 3af2344
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue May 6 01:25:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 15:40:10 2014 -0700

----------------------------------------------------------------------
 .../exec/expr/fn/impl/StringFunctionUtil.java   | 48 +++++++++++---------
 .../exec/physical/impl/TestStringFunctions.java | 37 ++++++---------
 .../resources/functions/string/testSubstr.json  |  9 ++--
 3 files changed, 46 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f65fd8d/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
index 0096a13..fbdab8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.expr.fn.impl;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
 import io.netty.buffer.ByteBuf;
 
 public class StringFunctionUtil {
@@ -26,15 +28,9 @@ public class StringFunctionUtil {
   public static int getUTF8CharLength(ByteBuf buffer, int start, int end) {
     int charCount = 0;
 
-    for (int id = start; id < end; id++) {
-      byte  currentByte = buffer.getByte(id);
-
-      if (currentByte < 0x128  ||           // 1-byte char. First byte is 0xxxxxxx.
-          (currentByte & 0xE0) == 0xC0 ||   // 2-byte char. First byte is 110xxxxx
-          (currentByte & 0xF0) == 0xE0 ||   // 3-byte char. First byte is 1110xxxx
-          (currentByte & 0xF8) == 0xF0) {   //4-byte char. First byte is 11110xxx
-        charCount ++;  //Advance the counter, since we find one char.
-      }
+    for (int idx = start, charLen = 0; idx < end; idx += charLen) {
+      charLen = utf8CharLen(buffer, idx);
+      ++charCount;  //Advance the counter, since we find one char.
     }
     return charCount;
   }
@@ -46,21 +42,14 @@ public class StringFunctionUtil {
   public static int getUTF8CharPosition(ByteBuf buffer, int start, int end, int charLength) {
     int charCount = 0;
 
-    if (start >=end)
+    if (start >= end)
       return -1;  //wrong input here.
 
-    for (int id = start; id < end; id++) {
-
-      byte  currentByte = buffer.getByte(id);
-
-      if (currentByte < 0x128  ||           // 1-byte char. First byte is 0xxxxxxx.
-          (currentByte & 0xE0) == 0xC0 ||   // 2-byte char. First byte is 110xxxxx
-          (currentByte & 0xF0) == 0xE0 ||   // 3-byte char. First byte is 1110xxxx
-          (currentByte & 0xF8) == 0xF0) {   //4-byte char. First byte is 11110xxx
-        charCount ++;  //Advance the counter, since we find one char.
-        if (charCount == charLength + 1) {
-          return id;
-        }
+    for (int idx = start, charLen = 0; idx < end; idx += charLen) {
+      charLen = utf8CharLen(buffer, idx);
+      ++charCount;  //Advance the counter, since we find one char.
+      if (charCount == charLength + 1) {
+        return idx;
       }
     }
     return end;
@@ -154,4 +143,19 @@ public class StringFunctionUtil {
     return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9');
   }
 
+  private static int utf8CharLen(ByteBuf buffer, int idx) {
+    byte firstByte = buffer.getByte(idx);
+    if (firstByte >= 0) { // 1-byte char. First byte is 0xxxxxxx.
+      return 1;
+    } else if ((firstByte & 0xE0) == 0xC0) { // 2-byte char. First byte is 110xxxxx
+      return 2;
+    } else if ((firstByte & 0xF0) == 0xE0) { // 3-byte char. First byte is 1110xxxx
+      return 3;
+    } else if ((firstByte & 0xF8) == 0xF0) { //4-byte char. First byte is 11110xxx
+      return 4;
+    }
+    throw new DrillRuntimeException("Unexpected byte 0x" + Integer.toString((int)firstByte & 0xff, 16)
+        + " at position " + idx + " encountered while decoding UTF8 string.");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f65fd8d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
index 09d1361..cd310b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestStringFunctions.java
@@ -17,45 +17,36 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
-import com.codahale.metrics.MetricRegistry;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
-import org.junit.AfterClass;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
-import org.junit.Assert;
+import org.junit.rules.TestRule;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
 
 public class TestStringFunctions extends ExecTest {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStringFunctions.class);
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStringFunctions.class);
 
   DrillConfig c = DrillConfig.create();
   PhysicalPlanReader reader;
@@ -72,7 +63,7 @@ public class TestStringFunctions extends ExecTest {
     int i = 0;
     for (ValueVector v : exec) {
       if  (v instanceof VarCharVector) {
-        res[i++] = new String( ((VarCharVector) v).getAccessor().get(0));
+        res[i++] = new String( ((VarCharVector) v).getAccessor().get(0), Charsets.UTF_8);
       } else
         res[i++] =  v.getAccessor().getObject(0);
     }
@@ -98,10 +89,10 @@ public class TestStringFunctions extends ExecTest {
 
     while(exec.next()){
       Object [] res = getRunResult(exec);
-      assertEquals("return count does not match", res.length, expectedResults.length);
+      assertEquals("return count does not match", expectedResults.length, res.length);
 
       for (int i = 0; i<res.length; i++) {
-        assertEquals(String.format("column %s does not match", i),  res[i], expectedResults[i]);
+        assertEquals(String.format("column %s does not match", i), expectedResults[i],  res[i]);
       }
     }
 
@@ -199,7 +190,7 @@ public class TestStringFunctions extends ExecTest {
   @Test
   public void testSubstr(@Injectable final DrillbitContext bitContext,
                            @Injectable UserServer.UserClientConnection connection) throws Throwable{
-    Object [] expected = new Object[] {"abc", "bcd", "bcdef", "bcdef", "", "", "", ""};
+    Object [] expected = new Object[] {"abc", "bcd", "bcdef", "bcdef", "", "", "", "", "भारत", "वर्ष", "वर्ष"};
 
     runTest(bitContext, connection, expected, "functions/string/testSubstr.json");
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f65fd8d/exec/java-exec/src/test/resources/functions/string/testSubstr.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/functions/string/testSubstr.json b/exec/java-exec/src/test/resources/functions/string/testSubstr.json
index e885381..94467ae 100644
--- a/exec/java-exec/src/test/resources/functions/string/testSubstr.json
+++ b/exec/java-exec/src/test/resources/functions/string/testSubstr.json
@@ -23,15 +23,18 @@
             child: 1,
             pop:"project",
             exprs: [
-        			{ ref: "col1", expr: "substring('abcdef', 1, 3)"},
+              { ref: "col1", expr: "substring('abcdef', 1, 3)"},
               { ref: "col2", expr: "substring('abcdef', 2, 3)"},
               { ref: "col3", expr: "substring('abcdef', 2, 5)"},
               { ref: "col4", expr: "substring('abcdef', 2, 10)"},
               { ref: "col5", expr: "substring('abcdef', 0, 3)"},
               { ref: "col6", expr: "substring('abcdef', -1, 3)"},
               { ref: "col7", expr: "substring('', 1, 2)"},
-              { ref: "col8", expr: "substring('abcdef', 10, 2)"}
-    			]
+              { ref: "col8", expr: "substring('abcdef', 10, 2)"},
+              { ref: "col9", expr: "substring('भारतवर्ष', 1, 4)"},
+              { ref: "col10", expr: "substring('भारतवर्ष', 5, 4)"},
+              { ref: "col11", expr: "substring('भारतवर्ष', 5, 5)"}
+            ]
         },
         {
             @id: 3,


[6/6] git commit: DRILL-381: Implement SYSTEM and SESSION options.

Posted by ja...@apache.org.
DRILL-381: Implement SYSTEM and SESSION options.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/08923cb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/08923cb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/08923cb8

Branch: refs/heads/master
Commit: 08923cb82a007171e699f5ac0b4e4380b3a5a448
Parents: 9b34f23
Author: Jason Altekruse <al...@gmail.com>
Authored: Sat Mar 29 15:02:58 2014 -0500
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 18:15:16 2014 -0700

----------------------------------------------------------------------
 .../drill/common/logical/PlanProperties.java    |  24 +-
 .../drill/exec/cache/DistributedCache.java      |   9 +-
 .../apache/drill/exec/cache/DistributedMap.java |   3 +-
 .../exec/cache/DistributedMapDeserializer.java  |  28 +++
 .../org/apache/drill/exec/cache/HazelCache.java |  55 ++++-
 .../exec/cache/JacksonAdvancedSerializer.java   |  65 +++++
 .../exec/cache/JacksonDrillSerializable.java    |  17 --
 .../drill/exec/cache/JacksonSerializable.java   |  54 +++++
 .../org/apache/drill/exec/cache/LocalCache.java |  94 ++++++-
 .../exec/cache/SerializationDefinition.java     |  36 +++
 .../apache/drill/exec/ops/FragmentContext.java  |  22 +-
 .../org/apache/drill/exec/ops/QueryContext.java |   9 +-
 .../drill/exec/ops/QueryContext.java.orig       | 135 +++++++++++
 .../apache/drill/exec/opt/BasicOptimizer.java   |  63 ++++-
 .../org/apache/drill/exec/opt/Optimizer.java    |  12 +-
 .../drill/exec/planner/PhysicalPlanReader.java  |  14 +-
 .../drill/exec/planner/SimpleExecPlanner.java   |  15 +-
 .../planner/fragment/SimpleParallelizer.java    |  11 +-
 .../exec/planner/logical/StoragePlugins.java    |  13 +-
 .../exec/planner/physical/PlannerSettings.java  |  17 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |   4 +-
 .../planner/sql/handlers/ExplainHandler.java    |  12 +-
 .../planner/sql/handlers/SetOptionHandler.java  |  20 +-
 .../sql/handlers/SimpleCommandResult.java       |   1 -
 .../apache/drill/exec/rpc/user/UserServer.java  |  10 +-
 .../apache/drill/exec/rpc/user/UserSession.java |  16 +-
 .../org/apache/drill/exec/server/Drillbit.java  |   2 +
 .../apache/drill/exec/server/Drillbit.java.orig | 141 +++++++++++
 .../drill/exec/server/DrillbitContext.java      |  12 +-
 .../drill/exec/server/DrillbitContext.java.orig | 157 ++++++++++++
 .../server/options/DrillConfigIterator.java     |  85 +++++++
 .../server/options/FragmentOptionsManager.java  |  81 +++++++
 .../drill/exec/server/options/OptionList.java   |  23 ++
 .../exec/server/options/OptionManager.java      |  37 +++
 .../exec/server/options/OptionValidator.java    |  65 +++++
 .../drill/exec/server/options/OptionValue.java  | 130 ++++++++++
 .../server/options/SessionOptionManager.java    |  98 ++++++++
 .../exec/server/options/SetOptionException.java |  62 +++++
 .../server/options/SystemOptionManager.java     | 154 ++++++++++++
 .../exec/server/options/TypeValidators.java     | 132 ++++++++++
 .../drill/exec/store/StoragePluginRegistry.java |  34 +--
 .../exec/store/StoragePluginRegistry.java.orig  | 243 +++++++++++++++++++
 .../exec/store/direct/DirectGroupScan.java      |   7 +
 .../drill/exec/store/mock/MockGroupScanPOP.java |  27 ++-
 .../drill/exec/store/pojo/AbstractWriter.java   |   2 +-
 .../drill/exec/store/pojo/PojoDataType.java     |  72 ++++++
 .../drill/exec/store/pojo/PojoRecordReader.java |  20 +-
 .../apache/drill/exec/store/pojo/Writers.java   | 133 +++++++++-
 .../drill/exec/store/sys/DrillbitIterator.java  |  61 +++++
 .../drill/exec/store/sys/StaticDrillTable.java  |  41 ++++
 .../drill/exec/store/sys/SystemTable.java       |  55 +++++
 .../exec/store/sys/SystemTableBatchCreator.java |  44 ++++
 .../drill/exec/store/sys/SystemTablePlugin.java | 118 +++++++++
 .../exec/store/sys/SystemTablePluginConfig.java |  32 +++
 .../drill/exec/store/sys/SystemTableScan.java   | 128 ++++++++++
 .../exec/work/batch/ControlHandlerImpl.java     |  40 +--
 .../exec/work/batch/ControlMessageHandler.java  |   4 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  12 +-
 .../work/fragment/NonRootFragmentManager.java   |   6 +-
 .../apache/drill/exec/work/user/UserWorker.java |   5 +
 .../java/org/apache/drill/PlanningBase.java     |  18 +-
 .../org/apache/drill/TestTpchSingleMode.java    |   3 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   6 +-
 .../drill/exec/pop/TestFragmentChecker.java     |  29 +--
 .../apache/drill/exec/server/TestOptions.java   |  41 ++++
 .../resources/server/options_session_check.json |  20 ++
 .../src/test/resources/server/options_set.json  |  24 ++
 .../apache/drill/jdbc/test/TestJdbcQuery.java   |   3 +
 .../org/apache/drill/exec/proto/BitControl.java | 181 +++++++++++++-
 protocol/src/main/protobuf/BitControl.proto     |   1 +
 70 files changed, 3142 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
index 606f2df..a2942b6 100644
--- a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
+++ b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,10 +17,9 @@
  ******************************************************************************/
 package org.apache.drill.common.logical;
 
+import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
@@ -33,12 +32,13 @@ public class PlanProperties {
   public int version;
 	public Generator generator;
 	public ResultMode resultMode;
+  public JSONOptions options;
 
 //	@JsonInclude(Include.NON_NULL)
 	public static class Generator{
 		public String type;
 		public String info;
-	  
+
 		public static enum ResultMode{
 	    EXEC, LOGICAL, PHYSICAL;
 	  }
@@ -52,12 +52,14 @@ public class PlanProperties {
   private PlanProperties(@JsonProperty("version") int version,
                          @JsonProperty("generator") Generator generator,
                          @JsonProperty("type") PlanType type,
-                         @JsonProperty("mode") ResultMode resultMode
+                         @JsonProperty("mode") ResultMode resultMode,
+                         @JsonProperty("options") JSONOptions options
                          ) {
     this.version = version;
     this.generator = generator;
     this.type = type;
     this.resultMode = resultMode == null ? ResultMode.EXEC : resultMode;
+    this.options = options;
   }
 
   public static PlanPropertiesBuilder builder() {
@@ -69,6 +71,7 @@ public class PlanProperties {
     private Generator generator;
     private PlanType type;
     private ResultMode mode = ResultMode.EXEC;
+    private JSONOptions options;
 
     public PlanPropertiesBuilder type(PlanType type) {
       this.type = type;
@@ -84,19 +87,24 @@ public class PlanProperties {
       this.generator = new Generator(type, info);
       return this;
     }
-    
+
     public PlanPropertiesBuilder resultMode(ResultMode mode){
       this.mode = mode;
       return this;
     }
 
+    public PlanPropertiesBuilder options(JSONOptions options){
+      this.options = options;
+      return this;
+    }
+
     public PlanPropertiesBuilder generator(Generator generator) {
       this.generator = generator;
       return this;
     }
 
     public PlanProperties build() {
-      return new PlanProperties(version, generator, type, mode);
+      return new PlanProperties(version, generator, type, mode, options);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index fdace08..65362e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -26,15 +26,16 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 public interface DistributedCache extends Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedCache.class);
-  
+
   public void run() throws DrillbitStartupException;
-  
+
 //  public void updateLocalQueueLength(int length);
-//  public List<WorkQueueStatus> getQueueLengths(); 
-  
+//  public List<WorkQueueStatus> getQueueLengths();
+
   public PlanFragment getFragment(FragmentHandle handle);
   public void storeFragment(PlanFragment fragment);
   public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz);
   public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz);
+  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz);
   public Counter getCounter(String name);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
index 8ea9cd1..1bee5fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -17,9 +17,10 @@
  */
 package org.apache.drill.exec.cache;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public interface DistributedMap<V extends DrillSerializable> {
+public interface DistributedMap<V extends DrillSerializable> extends Iterable<Map.Entry<String, V>>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
   public V get(String key);
   public void put(String key, V value);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
new file mode 100644
index 0000000..453dbfb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+
+public interface DistributedMapDeserializer<V extends DrillSerializable> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMapDeserializer.class);
+
+  public V put(String key, byte[] value) throws IOException;
+
+  public Class getValueClass();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index b701c8f..0149a57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -19,7 +19,9 @@ package org.apache.drill.exec.cache;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -45,6 +47,7 @@ import com.hazelcast.core.IMap;
 import com.hazelcast.core.ITopic;
 import com.hazelcast.core.Message;
 import com.hazelcast.core.MessageListener;
+import com.hazelcast.nio.serialization.StreamSerializer;
 
 public class HazelCache implements DistributedCache {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -55,12 +58,26 @@ public class HazelCache implements DistributedCache {
   private HandlePlan fragments;
   private Cache<WorkQueueStatus, Integer>  endpoints;
   private BufferAllocator allocator;
+  private DrillConfig config;
 
   public HazelCache(DrillConfig config, BufferAllocator allocator) {
     this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
     this.allocator = allocator;
+    this.config = config;
   }
 
+  private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
+    SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
+    c.getSerializationConfig().addSerializerConfig(sc);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private <T> void addJSer(Config c, SerializationDefinition d){
+    SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
+    c.getSerializationConfig().addSerializerConfig(sc);
+  }
+
+
   private class Listener implements MessageListener<HWorkQueueStatus>{
 
     @Override
@@ -68,16 +85,16 @@ public class HazelCache implements DistributedCache {
       logger.debug("Received new queue length message.");
       endpoints.put(wrapped.getMessageObject().get(), 0);
     }
-    
+
   }
-  
+
   public void run() {
     Config c = new Config();
-    SerializerConfig sc = new SerializerConfig() // 
-      .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
-      .setTypeClass(VectorAccessibleSerializable.class);
+    addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
+    addJSer(c, SerializationDefinition.OPTION);
+    addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
+
     c.setInstanceName(instanceName);
-    c.getSerializationConfig().addSerializerConfig(sc);
     c.getGroupConfig().setName(instanceName);
     for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
       logger.debug("Adding interface: {}", s);
@@ -128,7 +145,7 @@ public class HazelCache implements DistributedCache {
   public void storeFragment(PlanFragment fragment) {
     fragments.put(fragment.getHandle(), fragment);
   }
-  
+
 
   @Override
   public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
@@ -138,12 +155,18 @@ public class HazelCache implements DistributedCache {
 
   @Override
   public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
-    IMap<String, V> imap = this.instance.getMap(clazz.toString());
+    return getNamedMap(clazz.getName(), clazz);
+  }
+
+
+  @Override
+  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
+    IMap<String, V> imap = this.instance.getMap(name);
     MapConfig myMapConfig = new MapConfig();
     myMapConfig.setBackupCount(0);
     myMapConfig.setReadBackupData(true);
     instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
-    return new HCDistributedMapImpl<V>(imap, clazz);
+    return new HCDistributedMapImpl<V>(imap);
   }
 
   @Override
@@ -151,10 +174,12 @@ public class HazelCache implements DistributedCache {
     return new HCCounterImpl(this.instance.getAtomicLong(name));
   }
 
+
+
   public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
-    private IMap<String, V> m;
+    private final IMap<String, V> m;
 
-    public HCDistributedMapImpl(IMap<String, V> m, Class<V> clazz) {
+    public HCDistributedMapImpl(IMap<String, V> m) {
       this.m = m;
     }
 
@@ -172,7 +197,15 @@ public class HazelCache implements DistributedCache {
 
     public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
       m.putIfAbsent(key, value, ttl, timeunit);
+
+    }
+
+    @Override
+    public Iterator<Entry<String, V>> iterator() {
+      return m.entrySet().iterator();
     }
+
+
   }
 
   public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
new file mode 100644
index 0000000..edf31aa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+
+import org.apache.drill.common.util.DataInputInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+public class JacksonAdvancedSerializer<T> implements StreamSerializer<T>  {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonAdvancedSerializer.class);
+
+  private final Class<?> clazz;
+  private final ObjectMapper mapper;
+  private final int id;
+
+  public JacksonAdvancedSerializer(SerializationDefinition def, ObjectMapper mapper){
+    this.clazz =  def.clazz;
+    this.mapper = mapper;
+    this.id = def.id;
+  }
+
+  @Override
+  public int getTypeId() {
+    return id;
+  }
+
+  @Override
+  public void destroy() {
+  }
+
+  @Override
+  public void write(ObjectDataOutput out, T object) throws IOException {
+    out.write(mapper.writeValueAsBytes(object));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public T read(ObjectDataInput in) throws IOException {
+    return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz);
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
index a7b0be2..dcfc1ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
@@ -66,21 +66,4 @@ public abstract class JacksonDrillSerializable<T> implements DrillSerializable,
     return obj;
   }
 
-  public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
-
-    public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
-      super(context, obj);
-    }
-
-    public StoragePluginsSerializable(BufferAllocator allocator) {
-    }
-
-    public StoragePluginsSerializable() {
-    }
-
-    @Override
-    public void readFromStream(InputStream input) throws IOException {
-      readFromStream(input, StoragePlugins.class);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
new file mode 100644
index 0000000..831db84
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+public abstract class JacksonSerializable implements DrillSerializable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonSerializable.class);
+
+  private void fail(){
+    throw new UnsupportedOperationException("Need to register serialization config for class " + this.getClass().getName()); // rely on external serializer
+
+  }
+
+  @Override
+  public void readData(ObjectDataInput input) throws IOException {
+    fail();
+  }
+
+  @Override
+  public void readFromStream(InputStream input) throws IOException {
+    fail();
+  }
+
+  @Override
+  public void writeData(ObjectDataOutput output) throws IOException {
+    fail();
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    fail();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 119764b..0fb4b82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -17,14 +17,15 @@
  */
 package org.apache.drill.exec.cache;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -38,6 +39,7 @@ import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -49,11 +51,14 @@ public class LocalCache implements DistributedCache {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
 
   private volatile Map<FragmentHandle, PlanFragment> handles;
+  private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps;
   private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
   private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
   private volatile ConcurrentMap<String, Counter> counters;
   private static final BufferAllocator allocator = new TopLevelAllocator();
 
+  private static final ObjectMapper mapper = DrillConfig.create().getMapper();
+
   @Override
   public void close() throws IOException {
     handles = null;
@@ -65,6 +70,7 @@ public class LocalCache implements DistributedCache {
     maps = Maps.newConcurrentMap();
     multiMaps = Maps.newConcurrentMap();
     counters = Maps.newConcurrentMap();
+    namedMaps = Maps.newConcurrentMap();
   }
 
   @Override
@@ -78,7 +84,7 @@ public class LocalCache implements DistributedCache {
 //    logger.debug("Storing fragment: {}", fragment);
     handles.put(fragment.getHandle(), fragment);
   }
-  
+
   @Override
   public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
     DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz);
@@ -101,6 +107,18 @@ public class LocalCache implements DistributedCache {
     }
   }
 
+
+  @Override
+  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
+    DistributedMap m = namedMaps.get(clazz);
+    if (m == null) {
+      namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz));
+      return (DistributedMap<V>) namedMaps.get(name);
+    } else {
+      return m;
+    }
+  }
+
   @Override
   public Counter getCounter(String name) {
     Counter c = counters.get(name);
@@ -113,6 +131,16 @@ public class LocalCache implements DistributedCache {
   }
 
   public static ByteArrayDataOutput serialize(DrillSerializable obj) {
+    if(obj instanceof JacksonSerializable){
+      try{
+        ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        out.write(mapper.writeValueAsBytes(obj));
+        return out;
+      }catch(Exception e){
+        throw new RuntimeException(e);
+      }
+    }
+
     ByteArrayDataOutput out = ByteStreams.newDataOutput();
     OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out);
     try {
@@ -129,6 +157,14 @@ public class LocalCache implements DistributedCache {
   }
 
   public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
+    if(JacksonSerializable.class.isAssignableFrom(clazz)){
+      try{
+        return (V) mapper.readValue(bytes, clazz);
+      }catch(Exception e){
+        throw new RuntimeException(e);
+      }
+    }
+
     ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
     InputStream inputStream = DataInputInputStream.constructInputStream(in);
     try {
@@ -165,8 +201,8 @@ public class LocalCache implements DistributedCache {
   }
 
   public static class LocalDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
-    private ConcurrentMap<String, ByteArrayDataOutput> m;
-    private Class<V> clazz;
+    protected ConcurrentMap<String, ByteArrayDataOutput> m;
+    protected Class<V> clazz;
 
     public LocalDistributedMapImpl(Class<V> clazz) {
       m = Maps.newConcurrentMap();
@@ -196,6 +232,56 @@ public class LocalCache implements DistributedCache {
       m.putIfAbsent(key, serialize(value));
       logger.warn("Expiration not implemented in local map cache");
     }
+
+    private class DeserializingTransformer implements Iterator<Map.Entry<String, V> >{
+      private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner;
+
+      public DeserializingTransformer(Iterator<Entry<String, ByteArrayDataOutput>> inner) {
+        super();
+        this.inner = inner;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return inner.hasNext();
+      }
+
+      @Override
+      public Entry<String, V> next() {
+        return newEntry(inner.next());
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+
+      public Entry<String, V> newEntry(final Entry<String, ByteArrayDataOutput> input) {
+        return new Map.Entry<String, V>(){
+
+          @Override
+          public String getKey() {
+            return input.getKey();
+          }
+
+          @Override
+          public V getValue() {
+            return deserialize(input.getValue().toByteArray(), clazz);
+          }
+
+          @Override
+          public V setValue(V value) {
+            throw new UnsupportedOperationException();
+          }
+
+        };
+      }
+
+    }
+    @Override
+    public Iterator<Entry<String, V>> iterator() {
+      return new DeserializingTransformer(m.entrySet().iterator());
+    }
   }
 
   public static class LocalCounterImpl implements Counter {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
new file mode 100644
index 0000000..95ba434
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.options.OptionValue;
+
+public enum SerializationDefinition {
+
+  OPTION(3002, OptionValue.class),
+  STORAGE_PLUGINS(3003, StoragePlugins.class)
+  ;
+  public final int id;
+  public final Class<?> clazz;
+
+  SerializationDefinition(int id, Class<?> clazz){
+    this.id = id;
+    this.clazz = clazz;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index f3bcfef..2a6ab0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,6 +27,7 @@ import net.hydromatic.optiq.tools.Frameworks;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.compile.QueryClassLoader;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -42,6 +43,9 @@ import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.FragmentOptionsManager;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 import com.beust.jcommander.internal.Lists;
@@ -68,12 +72,13 @@ public class FragmentContext implements Closeable {
   private IncomingBuffers buffers;
   private final long queryStartTime;
   private final int rootFragmentTimeZone;
+  private final OptionManager sessionOptions;
 
   private volatile Throwable failureCause;
   private volatile boolean failed = false;
 
   public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
-      FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException {
+      FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
     this.loader = new QueryClassLoader(true);
     this.transformer = new ClassTransformer();
     this.stats = new FragmentStats(dbContext.getMetrics());
@@ -85,9 +90,24 @@ public class FragmentContext implements Closeable {
     this.rootFragmentTimeZone = fragment.getTimeZone();
     logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
     logger.debug("Fragment max allocation: {}", fragment.getMemMax());
+    try{
+      OptionList list;
+      if(!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()){
+        list = new OptionList();
+      }else{
+        list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
+      }
+      this.sessionOptions = new FragmentOptionsManager(context.getOptionManager(), list);
+    }catch(Exception e){
+      throw new ExecutionSetupException("Failure while reading plan options.", e);
+    }
     this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax());
   }
 
+  public OptionManager getOptions(){
+    return sessionOptions;
+  }
+
   public void setBuffers(IncomingBuffers buffers) {
     this.buffers = buffers;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 7e3b63d..f6ce04f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,6 +23,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.tools.Frameworks;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExpressionParsingException;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -33,7 +34,10 @@ import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.eigenbase.sql.SqlLiteral;
 
 public class QueryContext{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
@@ -52,7 +56,7 @@ public class QueryContext{
     this.workBus = drllbitContext.getWorkBus();
     this.session = session;
     this.timer = new Multitimer<>(QuerySetup.class);
-    this.plannerSettings = new PlannerSettings();
+    this.plannerSettings = new PlannerSettings(session.getOptions());
   }
 
   public PlannerSettings getPlannerSettings(){
@@ -79,6 +83,9 @@ public class QueryContext{
     return rootSchema;
   }
 
+  public OptionManager getOptions(){
+    return session.getOptions();
+  }
   public DrillbitEndpoint getCurrentEndpoint(){
     return drillbitContext.getEndpoint();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig
new file mode 100644
index 0000000..4f050a6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java.orig
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.Collection;
+
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.Frameworks;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+public class QueryContext{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+
+  private final QueryId queryId;
+  private final DrillbitContext drillbitContext;
+  private final WorkEventBus workBus;
+  private UserSession session;
+  public final Multitimer<QuerySetup> timer;
+  private final PlannerSettings plannerSettings;
+
+  public QueryContext(UserSession session, QueryId queryId, DrillbitContext drllbitContext) {
+    super();
+    this.queryId = queryId;
+    this.drillbitContext = drllbitContext;
+    this.workBus = drllbitContext.getWorkBus();
+    this.session = session;
+    this.timer = new Multitimer<>(QuerySetup.class);
+    this.plannerSettings = new PlannerSettings();
+  }
+
+  public PlannerSettings getPlannerSettings(){
+    return plannerSettings;
+  }
+
+  public UserSession getSession(){
+    return session;
+  }
+
+  public SchemaPlus getNewDefaultSchema(){
+    SchemaPlus rootSchema = getRootSchema();
+    SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
+    if(defaultSchema == null){
+      return rootSchema;
+    }else{
+      return defaultSchema;
+    }
+  }
+
+  public SchemaPlus getRootSchema(){
+    SchemaPlus rootSchema = Frameworks.createRootSchema();
+    drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), rootSchema);
+    return rootSchema;
+  }
+
+<<<<<<< HEAD
+=======
+  public Object getOptionValue(String name){
+    return drillbitContext.getGlobalDrillOptions().getOption(name);
+  }
+
+  public void setOptionValue(String name, String value) throws ExpressionParsingException {
+    drillbitContext.getGlobalDrillOptions().setOption(name, value);
+  }
+  
+>>>>>>> Drill 381 - implementation of session and global options.
+  public DrillbitEndpoint getCurrentEndpoint(){
+    return drillbitContext.getEndpoint();
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public StoragePluginRegistry getStorage(){
+    return drillbitContext.getStorage();
+  }
+
+
+  public DistributedCache getCache(){
+    return drillbitContext.getCache();
+  }
+
+  public Collection<DrillbitEndpoint> getActiveEndpoints(){
+    return drillbitContext.getBits();
+  }
+
+  public PhysicalPlanReader getPlanReader(){
+    return drillbitContext.getPlanReader();
+  }
+
+  public DataConnectionCreator getDataConnectionsPool(){
+    return drillbitContext.getDataConnectionsPool();
+  }
+
+  public DrillConfig getConfig(){
+    return drillbitContext.getConfig();
+  }
+
+  public WorkEventBus getWorkBus(){
+    return workBus;
+  }
+
+  public FunctionImplementationRegistry getFunctionRegistry(){
+    return drillbitContext.getFunctionImplementationRegistry();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index d7d5ccb..78c0e3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -53,8 +54,8 @@ import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.config.HashAggregate;
-import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.eigenbase.rel.RelFieldCollation.Direction;
 import org.eigenbase.rel.RelFieldCollation.NullDirection;
@@ -62,13 +63,46 @@ import org.eigenbase.rel.RelFieldCollation.NullDirection;
 import com.beust.jcommander.internal.Lists;
 
 public class BasicOptimizer extends Optimizer{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
 
   private DrillConfig config;
   private QueryContext context;
+  private UserServer.UserClientConnection userSession;
 
-  public BasicOptimizer(DrillConfig config, QueryContext context){
+  public BasicOptimizer(DrillConfig config, QueryContext context, UserServer.UserClientConnection userSession){
     this.config = config;
     this.context = context;
+    this.userSession = userSession;
+    logCurrentOptionValues();
+  }
+
+  private void logCurrentOptionValues(){
+//    Iterator<DrillOptionValue> optionVals = userSession.getSessionOptionIterator();
+//    DrillOptionValue val = null;
+//    String output = "";
+//    output += "SessionOptions: {\n";
+//    for ( ;optionVals.hasNext(); val = optionVals.next()){
+//      if (val != null) {
+//        output += val.getOptionName() + ":" + val.getValue() + ",\n";
+//      }
+//    }
+//    output += "}";
+//    logger.debug(output);
+  }
+
+  /**
+   * Get the current value of an option. Session options override global options.
+   *
+   * @param name - the name of the option
+   * @return - value of the option
+   */
+  private Object getOptionValue(String name) {
+//    Object val = userSession.getSessionLevelOption(name);
+//    if (val == null) {
+//      context.getOptionValue(name);
+//    }
+//    return val;
+    return null;
   }
 
   @Override
@@ -90,7 +124,8 @@ public class BasicOptimizer extends Optimizer{
     PlanProperties props = PlanProperties.builder()
         .type(PlanProperties.PlanType.APACHE_DRILL_PHYSICAL)
         .version(plan.getProperties().version)
-        .generator(plan.getProperties().generator).build();
+        .generator(plan.getProperties().generator)
+        .options(new JSONOptions(context.getOptions().getSessionOptionList())).build();
     PhysicalPlan p = new PhysicalPlan(props, physOps);
     return p;
     //return new PhysicalPlan(props, physOps);
@@ -103,10 +138,20 @@ public class BasicOptimizer extends Optimizer{
 
   public static class BasicOptimizationContext implements OptimizationContext {
 
+    private OptionManager ops;
+    public BasicOptimizationContext(QueryContext c){
+      this.ops = c.getOptions();
+    }
+
     @Override
     public int getPriority() {
       return 1;
     }
+
+    @Override
+    public OptionManager getOptions() {
+      return ops;
+    }
   }
 
   private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
@@ -121,10 +166,10 @@ public class BasicOptimizer extends Optimizer{
 
     @Override
     public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException {
-      
+
       List<Ordering> orderDefs = Lists.newArrayList();
 
-      
+
       PhysicalOperator input = groupBy.getInput().accept(this, value);
 
       if(groupBy.getKeys().length > 0){
@@ -133,7 +178,7 @@ public class BasicOptimizer extends Optimizer{
         }
         input = new Sort(input, orderDefs, false);
       }
-      
+
       StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
       return sa;
     }
@@ -165,7 +210,7 @@ public class BasicOptimizer extends Optimizer{
       }
       leftOp = new Sort(leftOp, leftOrderDefs, false);
       leftOp = new SelectionVectorRemover(leftOp);
-      
+
       PhysicalOperator rightOp = join.getRight().accept(this, value);
       List<Ordering> rightOrderDefs = Lists.newArrayList();
       for(JoinCondition jc : join.getConditions()){
@@ -173,7 +218,7 @@ public class BasicOptimizer extends Optimizer{
       }
       rightOp = new Sort(rightOp, rightOrderDefs, false);
       rightOp = new SelectionVectorRemover(rightOp);
-      
+
       MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType());
       return new SelectionVectorRemover(mjp);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
index c7069c3..34d0622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
@@ -24,23 +24,25 @@ import org.apache.drill.common.exceptions.DrillConfigurationException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.server.options.OptionManager;
 
 public abstract class Optimizer implements Closeable{
-  
+
   public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
-  
+
   public abstract void init(DrillConfig config);
-  
+
   public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException;
   public abstract void close();
-  
+
   public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
     Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
     o.init(config);
     return o;
   }
-  
+
   public interface OptimizationContext{
     public int getPriority();
+    public OptionManager getOptions();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 167a992..8d77136 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -30,14 +30,14 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.MajorTypeSerDe;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.store.StoragePluginRegistry;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StoragePluginRegistry;
 
 public class PhysicalPlanReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
@@ -56,8 +56,8 @@ public class PhysicalPlanReader {
         .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) //
         .addSerializer(MajorType.class, new MajorTypeSerDe.Se())
         .addDeserializer(MajorType.class, new MajorTypeSerDe.De());
-        
-        
+
+
     mapper.registerModule(deserModule);
     mapper.registerSubtypes(PhysicalOperatorUtil.getSubTypes(config));
     InjectableValues injectables = new InjectableValues.Std() //
@@ -78,10 +78,14 @@ public class PhysicalPlanReader {
     this(config, mapper, endpoint, null);
   }
 
+  public String writeJson(OptionList list) throws JsonProcessingException{
+    return mapper.writeValueAsString(list);
+  }
+
   public String writeJson(PhysicalOperator op) throws JsonProcessingException{
     return mapper.writeValueAsString(op);
   }
-  
+
   public PhysicalPlan readPhysicalPlan(String json) throws JsonProcessingException, IOException {
     logger.debug("Reading physical plan {}", json);
     return physicalPlanReader.readValue(json);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
index dff2dd3..4da6500 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
 import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
 
 /**
@@ -34,25 +35,25 @@ import org.apache.drill.exec.work.QueryWorkUnit;
  */
 public class SimpleExecPlanner implements ExecPlanner{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleExecPlanner.class);
-  
+
   private MakeFragmentsVisitor fragmenter = new MakeFragmentsVisitor();
   private SimpleParallelizer parallelizer = new SimpleParallelizer();
 
   @Override
   public QueryWorkUnit getWorkUnit(QueryContext context, PhysicalPlan plan, int maxWidth) throws ExecutionSetupException {
-    
+
     // get the root physical operator and split the plan into sub fragments.
     PhysicalOperator root = plan.getSortedOperators(false).iterator().next();
     Fragment fragmentRoot = root.accept(fragmenter, null);
-    
+
     // generate a planning set and collect stats.
     PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
 
     int maxWidthPerEndpoint = context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT);
-    
-    return parallelizer.getFragments(context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(),
+
+    return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(),
             context.getPlanReader(), fragmentRoot, planningSet, maxWidth, maxWidthPerEndpoint);
-    
-    
+
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 6e951df..c34869d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
 import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -31,6 +32,7 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -67,13 +69,13 @@ public class SimpleParallelizer {
    * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
    * @throws ExecutionSetupException
    */
-  public QueryWorkUnit getFragments(DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
                                     int globalMaxWidth, int maxWidthPerEndpoint) throws ExecutionSetupException {
     assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, maxWidthPerEndpoint);
-    return generateWorkUnit(foremanNode, queryId, reader, rootNode, planningSet);
+    return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet);
   }
 
-  private QueryWorkUnit generateWorkUnit(DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
+  private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
                                          PlanningSet planningSet) throws ExecutionSetupException {
 
     List<PlanFragment> fragments = Lists.newArrayList();
@@ -109,8 +111,10 @@ public class SimpleParallelizer {
 
         // get plan as JSON
         String plan;
+        String optionsData;
         try {
           plan = reader.writeJson(root);
+          optionsData = reader.writeJson(options);
         } catch (JsonProcessingException e) {
           throw new FragmentSetupException("Failure while trying to convert fragment into json.", e);
         }
@@ -135,6 +139,7 @@ public class SimpleParallelizer {
             .setTimeZone(timeZone)//
             .setMemInitial(wrapper.getInitialAllocation())//
             .setMemMax(wrapper.getMaxAllocation())
+            .setOptionsJson(optionsData)
             .build();
 
         if (isRootNode) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
index 939b77c..d296eb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
@@ -26,21 +26,22 @@ import java.util.Map.Entry;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.cache.JacksonSerializable;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 
-public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>>{
-  
+public class StoragePlugins extends JacksonSerializable implements Iterable<Map.Entry<String, StoragePluginConfig>>{
+
   private Map<String, StoragePluginConfig> storage;
-  
+
   @JsonCreator
   public StoragePlugins(@JsonProperty("storage") Map<String, StoragePluginConfig> storage){
     this.storage = storage;
   }
-  
+
   public static void main(String[] args) throws Exception{
     DrillConfig config = DrillConfig.create();
     String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
@@ -88,6 +89,6 @@ public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginC
     }
     return storage.equals(((StoragePlugins) obj).getStorage());
   }
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index e6e99c0..5e49451 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -19,17 +19,24 @@ package org.apache.drill.exec.planner.physical;
 
 import net.hydromatic.optiq.tools.FrameworkContext;
 
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValidator;
+import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
+
 public class PlannerSettings implements FrameworkContext{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class);
 
-  private boolean singleMode;
 
-  public boolean isSingleMode() {
-    return singleMode;
+  public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
+
+  public OptionManager options;
+
+  public PlannerSettings(OptionManager options){
+    this.options = options;
   }
 
-  public void setSingleMode(boolean singleMode) {
-    this.singleMode = singleMode;
+  public boolean isSingleMode() {
+    return options.getOption(EXCHANGE.getOptionName()).bool_val;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 0e9f798..116017c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.tools.Planner;
 import net.hydromatic.optiq.tools.RelConversionException;
 import net.hydromatic.optiq.tools.ValidationException;
 
+import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.PlanProperties;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder;
@@ -34,12 +35,10 @@ import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillStoreRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.eigenbase.rel.RelNode;
@@ -132,6 +131,7 @@ public class DefaultSqlHandler implements SqlHandler{
     PlanPropertiesBuilder propsBuilder = PlanProperties.builder();
     propsBuilder.type(PlanType.APACHE_DRILL_PHYSICAL);
     propsBuilder.version(1);
+    propsBuilder.options(new JSONOptions(context.getOptions().getSessionOptionList()));
     propsBuilder.resultMode(ResultMode.EXEC);
     propsBuilder.generator(this.getClass().getSimpleName(), "");
     return new PhysicalPlan(propsBuilder.build(), getPops(op));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index dec9222..86ce6c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -59,7 +59,7 @@ public class ExplainHandler extends DefaultSqlHandler{
     log("Drill Logical", drel);
 
     if(mode == ResultMode.LOGICAL){
-      LogicalExplain logicalResult = new LogicalExplain(drel);
+      LogicalExplain logicalResult = new LogicalExplain(drel, level, context);
       return DirectPlan.createDirectPlan(context, logicalResult);
     }
 
@@ -68,7 +68,7 @@ public class ExplainHandler extends DefaultSqlHandler{
     PhysicalOperator pop = convertToPop(prel);
     PhysicalPlan plan = convertToPlan(pop);
     log("Drill Plan", plan);
-    PhysicalExplain physicalResult = new PhysicalExplain(prel, plan);
+    PhysicalExplain physicalResult = new PhysicalExplain(prel, plan, level, context);
     return DirectPlan.createDirectPlan(context, physicalResult);
   }
 
@@ -93,11 +93,11 @@ public class ExplainHandler extends DefaultSqlHandler{
   }
 
 
-  public class LogicalExplain{
+  public static class LogicalExplain{
     public String text;
     public String json;
 
-    public LogicalExplain(RelNode node){
+    public LogicalExplain(RelNode node, SqlExplainLevel level, QueryContext context){
       this.text = RelOptUtil.toString(node, level);
       DrillImplementor implementor = new DrillImplementor(new DrillParseContext(), ResultMode.LOGICAL);
       implementor.go( (DrillRel) node);
@@ -106,11 +106,11 @@ public class ExplainHandler extends DefaultSqlHandler{
     }
   }
 
-  public class PhysicalExplain{
+  public static class PhysicalExplain{
     public String text;
     public String json;
 
-    public PhysicalExplain(RelNode node, PhysicalPlan plan){
+    public PhysicalExplain(RelNode node, PhysicalPlan plan, SqlExplainLevel level, QueryContext context){
       this.text = RelOptUtil.toString(node, level);
       this.json = plan.unparse(context.getConfig().getMapper().writer());
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index 64be891..ae3960a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.tools.ValidationException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.eigenbase.sql.SqlLiteral;
 import org.eigenbase.sql.SqlNode;
 import org.eigenbase.sql.SqlSetOption;
 
@@ -46,10 +47,23 @@ public class SetOptionHandler implements SqlHandler{
     String scope = option.getScope();
     String name = option.getName();
     SqlNode value = option.getValue();
-    if(name.equals("NO_EXCHANGES")){
-      context.getPlannerSettings().setSingleMode(true);
+    if(value instanceof SqlLiteral){
+      switch(scope.toLowerCase()){
+      case "session":
+        context.getOptions().setOption(option.getName(), (SqlLiteral) value);
+        break;
+      case "system":
+        context.getOptions().getSystemManager().setOption(name, (SqlLiteral) value);
+        break;
+      default:
+        throw new ValidationException("Invalid OPTION scope.  Scope must be SESSION or SYSTEM.");
+      }
+
+    }else{
+      throw new ValidationException("Sql options can only be literals.");
     }
-    return DirectPlan.createDirectPlan(context, true, "disabled exchanges.");
+
+    return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", name));
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
index 6dd7e7b..07d26b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SimpleCommandResult.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.planner.sql.handlers;
 
 public class SimpleCommandResult {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleCommandResult.class);
 
   public boolean ok;
   public String summary;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index acd8412..3ee25d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,13 +17,13 @@
  */
 package org.apache.drill.exec.rpc.user;
 
-import java.io.IOException;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 
+import java.io.IOException;
+
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -100,12 +100,13 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   public class UserClientConnection extends RemoteConnection {
 
     private UserSession session;
+
     public UserClientConnection(Channel channel) {
       super(channel);
     }
 
     void setUser(UserCredentials credentials, UserProperties props) throws IOException{
-      session = new UserSession(credentials, props);
+      session = new UserSession(worker.getSystemOptions(), credentials, props);
     }
 
     public UserSession getSession(){
@@ -114,7 +115,6 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
     public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
 //      logger.debug("Sending result to client with {}", result);
-
       send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
     }
 
@@ -122,6 +122,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
     public BufferAllocator getAllocator() {
       return alloc;
     }
+
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index f27317c..86c4bad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -18,14 +18,18 @@
 package org.apache.drill.exec.rpc.user;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 
-import com.google.common.collect.Maps;
 import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+
+import com.google.common.collect.Maps;
 
 public class UserSession {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class);
@@ -36,10 +40,11 @@ public class UserSession {
   private boolean enableExchanges = true;
   private UserCredentials credentials;
   private Map<String, String> properties;
+  private OptionManager options;
 
-  public UserSession(UserCredentials credentials, UserProperties properties) throws IOException{
+  public UserSession(OptionManager systemOptions, UserCredentials credentials, UserProperties properties) throws IOException{
     this.credentials = credentials;
-
+    this.options = new SessionOptionManager(systemOptions);
     this.properties = Maps.newHashMap();
     if (properties == null) return;
     for (int i=0; i<properties.getPropertiesCount(); i++) {
@@ -48,6 +53,10 @@ public class UserSession {
     }
   }
 
+  public OptionManager getOptions(){
+    return options;
+  }
+
   public DrillUser getUser(){
     return user;
   }
@@ -95,4 +104,5 @@ public class UserSession {
     }
     return schema;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 411a76c..7f607a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
 import org.apache.drill.exec.cache.HazelCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
@@ -96,6 +97,7 @@ public class Drillbit implements Closeable{
     manager.start(md, cache, engine.getController(), engine.getDataConnectionCreator(), coord);
     cache.run();
     manager.getContext().getStorage().init();
+    manager.getContext().getOptionManager().init();
     handle = coord.register(md);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig
new file mode 100644
index 0000000..5e72717
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java.orig
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.cache.HazelCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
+import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.work.WorkManager;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Starts, tracks and stops all the required services for a Drillbit daemon to work.
+ */
+public class Drillbit implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
+
+  public static Drillbit start(StartupOptions options) throws DrillbitStartupException {
+    return start(DrillConfig.create(options.getConfigLocation()));
+  }
+
+  public static Drillbit start(DrillConfig config) throws DrillbitStartupException {
+    Drillbit bit;
+    try {
+      logger.debug("Setting up Drillbit.");
+      bit = new Drillbit(config, null);
+    } catch (Exception ex) {
+      throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
+    }
+    try {
+      logger.debug("Starting Drillbit.");
+      bit.run();
+    } catch (Exception e) {
+      throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
+    }
+    return bit;
+  }
+
+  public static void main(String[] cli) throws DrillbitStartupException {
+    StartupOptions options = StartupOptions.parse(cli);
+    start(options);
+  }
+
+  final ClusterCoordinator coord;
+  final ServiceEngine engine;
+  final DistributedCache cache;
+  final WorkManager manager;
+  final BootStrapContext context;
+
+  private volatile RegistrationHandle handle;
+
+  public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
+    if(serviceSet != null){
+      this.context = new BootStrapContext(config);
+      this.manager = new WorkManager(context);
+      this.coord = serviceSet.getCoordinator();
+      this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler());
+      this.cache = serviceSet.getCache();
+    }else{
+      Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
+      this.context = new BootStrapContext(config);
+      this.manager = new WorkManager(context);
+      this.coord = new ZKClusterCoordinator(config);
+      this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler());
+      this.cache = new HazelCache(config, context.getAllocator());
+    }
+  }
+
+  public void run() throws Exception {
+    coord.start(10000);
+    DrillbitEndpoint md = engine.start();
+    cache.run();
+<<<<<<< HEAD
+    manager.getContext().getStorage().init();
+=======
+    manager.start(md, cache, engine.getController(), engine.getDataConnectionCreator(), coord);
+>>>>>>> Drill 381 - implementation of session and global options.
+    handle = coord.register(md);
+  }
+
+  public void close() {
+    if (coord != null) coord.unregister(handle);
+
+    try {
+      Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted while sleeping during coordination deregistration.");
+    }
+
+    Closeables.closeQuietly(engine);
+    Closeables.closeQuietly(coord);
+    Closeables.closeQuietly(manager);
+    Closeables.closeQuietly(context);
+    logger.info("Shutdown completed.");
+  }
+
+  private class ShutdownThread extends Thread {
+    ShutdownThread(DrillConfig config) {
+      this.setName("ShutdownHook");
+    }
+
+    @Override
+    public void run() {
+      logger.info("Received shutdown request.");
+      close();
+    }
+
+  }
+  public ClusterCoordinator getCoordinator(){
+    return coord;
+  }
+
+  public DrillbitContext getContext(){
+    return this.manager.getContext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 1fd30e3..f0a47f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -22,7 +22,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import java.util.Collection;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -33,9 +32,10 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
-import org.apache.drill.exec.store.StoragePlugin;
 
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Preconditions;
@@ -55,6 +55,7 @@ public class DrillbitContext {
   private final Controller controller;
   private final WorkEventBus workBus;
   private final FunctionImplementationRegistry functionRegistry;
+  private final SystemOptionManager systemOptions;
 
   public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
     super();
@@ -73,6 +74,9 @@ public class DrillbitContext {
     this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storagePlugins);
     this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
     this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+    this.systemOptions = new SystemOptionManager(cache);
+
+//    this.globalDrillOptions = new DistributedGlobalOptions(this.cache);
   }
 
   public FunctionImplementationRegistry getFunctionImplementationRegistry() {
@@ -83,6 +87,10 @@ public class DrillbitContext {
     return workBus;
   }
 
+  public SystemOptionManager getOptionManager() {
+    return systemOptions;
+  }
+
   public DrillbitEndpoint getEndpoint(){
     return endpoint;
   }


[3/6] git commit: DRILL-645: Correct string encodings in code template

Posted by ja...@apache.org.
DRILL-645: Correct string encodings in code template


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9b34f23d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9b34f23d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9b34f23d

Branch: refs/heads/master
Commit: 9b34f23dd8291c7c4c88e0421d02d56e65ae0cd2
Parents: 0f65fd8
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue May 6 02:42:59 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 6 15:40:38 2014 -0700

----------------------------------------------------------------------
 .../codegen/templates/CastFunctionsSrcVarLen.java   | 16 ++++++++--------
 .../src/main/codegen/templates/CastVarCharDate.java |  2 +-
 .../main/codegen/templates/CastVarCharInterval.java |  2 +-
 .../DateToCharFunctions.java                        |  2 +-
 .../ToDateTypeFunctions.java                        |  4 ++--
 .../templates/Decimal/CastVarCharDecimal.java       | 10 +++++-----
 .../codegen/templates/ObjectInspectorHelper.java    |  4 +++-
 .../src/main/codegen/templates/ValueHolders.java    | 10 +++++++++-
 .../codegen/templates/VariableLengthVectors.java    | 12 ++++++++++--
 .../drill/exec/physical/impl/TestHiveUDFs.java      | 14 +++++++-------
 10 files changed, 47 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java b/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
index 5855381..aa216cd 100644
--- a/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
+++ b/exec/java-exec/src/main/codegen/templates/CastFunctionsSrcVarLen.java
@@ -21,7 +21,7 @@
   { 
     byte[] buf = new byte[in.end - in.start];
     in.buffer.getBytes(in.start, buf, 0, in.end - in.start);  
-    throw new NumberFormatException(new String(buf));
+    throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
   }  
 </#macro>
 
@@ -58,7 +58,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
       in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
     
       //TODO: need capture format exception, and issue SQLERR code.
-      out.value = ${type.javaType}.parse${type.parse}(new String(buf));
+      out.value = ${type.javaType}.parse${type.parse}(new String(buf, com.google.common.base.Charsets.UTF_8));
       
     <#elseif type.to=="Int" || type.to == "BigInt">
 
@@ -66,7 +66,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
         //empty, not a valid number
         byte[] buf = new byte[in.end - in.start];
         in.buffer.getBytes(in.start, buf, 0, in.end - in.start);  
-        throw new NumberFormatException(new String(buf));  
+        throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));  
       }
 
       int readIndex = in.start;
@@ -77,7 +77,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
         //only one single '-'
         byte[] buf = new byte[in.end - in.start];
         in.buffer.getBytes(in.start, buf, 0, in.end - in.start);  
-        throw new NumberFormatException(new String(buf));  
+        throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));  
       }
    
       int radix = 10;
@@ -91,13 +91,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
         if (digit == -1) {
           byte[] buf = new byte[in.end - in.start];
           in.buffer.getBytes(in.start, buf, 0, in.end - in.start);  
-          throw new NumberFormatException(new String(buf));  
+          throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));  
         }
         //overflow
         if (max > result) {
           byte[] buf = new byte[in.end - in.start];
           in.buffer.getBytes(in.start, buf, 0, in.end - in.start);  
-          throw new NumberFormatException(new String(buf));  
+          throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));  
         }
         
         ${type.primeType} next = result * radix - digit;
@@ -106,7 +106,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
         if (next > result) {
           byte[] buf = new byte[in.end - in.start];
           in.buffer.getBytes(in.start, buf, 0, in.end - in.start);  
-          throw new NumberFormatException(new String(buf));  
+          throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));  
         }
         result = next;
       }
@@ -116,7 +116,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
         if (result < 0) {
           byte[] buf = new byte[in.end - in.start];
           in.buffer.getBytes(in.start, buf, 0, in.end - in.start);  
-          throw new NumberFormatException(new String(buf));  
+          throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));  
         }
       }
    

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
index 39b8b41..249b555 100644
--- a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
+++ b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
@@ -54,7 +54,7 @@ public class Cast${type.from}To${type.to} implements DrillSimpleFunc {
 
       byte[] buf = new byte[in.end - in.start];
       in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-      String input = new String(buf);
+      String input = new String(buf, com.google.common.base.Charsets.UTF_8);
 
       <#if type.to == "Date">
       org.joda.time.format.DateTimeFormatter f = org.apache.drill.exec.expr.fn.impl.DateUtility.getDateTimeFormatter();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java b/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
index ebb1bd9..135abdd 100644
--- a/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
+++ b/exec/java-exec/src/main/codegen/templates/CastVarCharInterval.java
@@ -55,7 +55,7 @@ public class Cast${type.from}To${type.to} implements DrillSimpleFunc {
 
       byte[] buf = new byte[in.end - in.start];
       in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-      String input = new String(buf);
+      String input = new String(buf, com.google.common.base.Charsets.UTF_8);
 
       // Parse the ISO format
       org.joda.time.Period period = org.joda.time.Period.parse(input);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
index cd5466c..326b1df 100644
--- a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/DateToCharFunctions.java
@@ -57,7 +57,7 @@ public class G${type}ToChar implements DrillSimpleFunc {
         // Get the desired output format and create a DateTimeFormatter
         byte[] buf = new byte[right.end - right.start];
         right.buffer.getBytes(right.start, buf, 0, right.end - right.start);
-        String input = new String(buf);
+        String input = new String(buf, com.google.common.base.Charsets.UTF_8);
         format = org.joda.time.format.DateTimeFormat.forPattern(input);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
index 30dfc89..fffaef4 100644
--- a/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/DateIntervalFunctionTemplates/ToDateTypeFunctions.java
@@ -50,7 +50,7 @@ public class GTo${type} implements DrillSimpleFunc {
         // Get the desired output format
         byte[] buf = new byte[right.end - right.start];
         right.buffer.getBytes(right.start, buf, 0, right.end - right.start);
-        String formatString = new String(buf);
+        String formatString = new String(buf, com.google.common.base.Charsets.UTF_8);
         format = org.joda.time.format.DateTimeFormat.forPattern(formatString);
     }
 
@@ -59,7 +59,7 @@ public class GTo${type} implements DrillSimpleFunc {
         // Get the input
         byte[] buf1 = new byte[left.end - left.start];
         left.buffer.getBytes(left.start, buf1, 0, left.end - left.start);
-        String input = new String(buf1);
+        String input = new String(buf1, com.google.common.base.Charsets.UTF_8);
 
         <#if type == "Date">
         out.value = (org.joda.time.DateMidnight.parse(input, format).withZoneRetainFields(org.joda.time.DateTimeZone.UTC)).getMillis();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
index b0214f4..e3eb973 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
@@ -107,7 +107,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
                 // not a valid digit
                 byte[] buf = new byte[in.end - in.start];
                 in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-                throw new org.apache.drill.common.exceptions.DrillRuntimeException(new String(buf));
+                throw new org.apache.drill.common.exceptions.DrillRuntimeException(new String(buf, com.google.common.base.Charsets.UTF_8));
             }
             out.value *= radix;
             out.value += next;
@@ -117,7 +117,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
         if (((integerEndIndex - integerStartIndex) + out.scale) > out.precision) {
             byte[] buf = new byte[in.end - in.start];
             in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-            throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf) + " Precision: " + out.precision +
+            throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf, com.google.common.base.Charsets.UTF_8) + " Precision: " + out.precision +
                                                                                " Total Digits: " + (out.scale + (integerEndIndex - integerStartIndex)));
         }
 
@@ -240,7 +240,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
                     // not a valid digit
                     byte[] buf = new byte[in.end - in.start];
                     in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-                    throw new NumberFormatException(new String(buf));
+                    throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
                 }
 
                 integerDigits++;
@@ -253,7 +253,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
         if (integerDigits + out.scale > out.precision) {
             byte[] buf = new byte[in.end - in.start];
             in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-            throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf) + " Precision: " + out.precision + " Total Digits: " + (out.scale + integerDigits));
+            throw new org.apache.drill.common.exceptions.DrillRuntimeException("Precision is insufficient for the provided input: " + new String(buf, com.google.common.base.Charsets.UTF_8) + " Precision: " + out.precision + " Total Digits: " + (out.scale + integerDigits));
         }
 
 
@@ -313,7 +313,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
                     // not a valid digit
                     byte[] buf = new byte[in.end - in.start];
                     in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-                    throw new NumberFormatException(new String(buf));
+                    throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
                 }
                 int value = (out.getInteger(decimalBufferIndex) * radix) + next;
                 out.setInteger(decimalBufferIndex, value);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
index 61078c7..3c72a96 100644
--- a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
@@ -23,8 +23,10 @@
 package org.apache.drill.exec.expr.fn.impl.hive;
 
 import com.sun.codemodel.*;
+
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.DirectExpression;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -153,7 +155,7 @@ public class ObjectInspectorHelper {
           <#elseif entry.hiveType == "STRING">
             JVar data = jc._else().decl(m.directClass(byte[].class.getCanonicalName()), "data",
               castedOI.invoke("getPrimitiveJavaObject").arg(returnValue)
-                      .invoke("getBytes"));
+                      .invoke("getBytes").arg(DirectExpression.direct("com.google.common.base.Charsets.UTF_16")));
             jc._else().add(returnValueHolder.ref("buffer")
               .invoke("setBytes").arg(JExpr.lit(0)).arg(data));
             jc._else().assign(returnValueHolder.ref("start"), JExpr.lit(0));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/ValueHolders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ValueHolders.java b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
index e36e655..7272f4f 100644
--- a/exec/java-exec/src/main/codegen/templates/ValueHolders.java
+++ b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
@@ -136,7 +136,15 @@ public final class ${className} implements ValueHolder{
       </#if>
         byte[] buf = new byte[end-start];
         buffer.getBytes(start, buf, 0, end-start);
-        return new String(buf);
+
+        <#switch minor.class>
+        <#case "Var16Char">
+        return new String(buf, com.google.common.base.Charsets.UTF_16);
+        <#break>
+        <#case "VarChar">
+        <#default>
+        return new String(buf, com.google.common.base.Charsets.UTF_8);
+        </#switch>
       }
 
       </#if>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 3905bce..6b05ec5 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -463,11 +463,19 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     @Override
     public void generateTestData(int size){
       boolean even = true;
+      <#switch minor.class>
+      <#case "Var16Char">
+      java.nio.charset.Charset charset = Charsets.UTF_16;
+      <#break>
+      <#case "VarChar">
+      <#default>
+      java.nio.charset.Charset charset = Charsets.UTF_8;
+      </#switch>
       for(int i =0; i < size; i++, even = !even){
         if(even){
-          set(i, new String("aaaaa").getBytes(Charsets.UTF_8));
+          set(i, new String("aaaaa").getBytes(charset));
         }else{
-          set(i, new String("bbbbbbbbbb").getBytes(Charsets.UTF_8));
+          set(i, new String("bbbbbbbbbb").getBytes(charset));
         }
       }
       setValueCount(size);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b34f23d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
index dff69e1..eabeda2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
@@ -109,28 +109,28 @@ public class TestHiveUDFs extends ExecTest {
       for(int i=0; i<exec.getRecordCount(); i++) {
 
 
-        String in = new String(str1V.getAccessor().get(i));
-        String upper = new String(upperStr1V.getAccessor().get(i));
+        String in = new String(str1V.getAccessor().get(i), Charsets.UTF_16);
+        String upper = new String(upperStr1V.getAccessor().get(i), Charsets.UTF_16);
         assertTrue(in.toUpperCase().equals(upper));
 
         long unix_timestamp = unix_timestampV.getAccessor().get(i);
 
-        String concat = new String(concatV.getAccessor().get(i));
+        String concat = new String(concatV.getAccessor().get(i), Charsets.UTF_16);
         assertTrue(concat.equals(in+"-"+in));
 
         float flt1 = flt1V.getAccessor().get(i);
-        String format_number = new String(format_numberV.getAccessor().get(i));
+        String format_number = new String(format_numberV.getAccessor().get(i), Charsets.UTF_16);
 
 
         String nullableStr1 = null;
         /* DRILL-425
         if (!nullableStr1V.getAccessor().isNull(i))
-          nullableStr1 = new String(nullableStr1V.getAccessor().get(i));*/
+          nullableStr1 = new String(nullableStr1V.getAccessor().get(i), Charsets.UTF_16);*/
 
         String upperNullableStr1 = null;
         /* DRILL-425
         if (!upperNullableStr1V.getAccessor().isNull(i))
-          upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i)); */
+          upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i), Charsets.UTF_16); */
 
         assertEquals(nullableStr1 != null, upperNullableStr1 != null);
         if (nullableStr1 != null)
@@ -184,7 +184,7 @@ public class TestHiveUDFs extends ExecTest {
 
       for(int i=0; i<exec.getRecordCount(); i++) {
 
-        String str1 = new String(str1V.getAccessor().get(i));
+        String str1 = new String(str1V.getAccessor().get(i), Charsets.UTF_16);
         int str1Length = str1LengthV.getAccessor().get(i);
         assertTrue(str1.length() == str1Length);
 


[5/6] DRILL-381: Implement SYSTEM and SESSION options.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
new file mode 100644
index 0000000..cdfcf60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Collection;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
+import org.apache.drill.exec.store.StoragePlugin;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
+
+public class DrillbitContext {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+
+  private final BootStrapContext context;
+
+  private PhysicalPlanReader reader;
+  private final ClusterCoordinator coord;
+  private final DataConnectionCreator connectionsPool;
+  private final DistributedCache cache;
+  private final DrillbitEndpoint endpoint;
+  private final StoragePluginRegistry storagePlugins;
+  private final OperatorCreatorRegistry operatorCreatorRegistry;
+  private final Controller controller;
+  private final WorkEventBus workBus;
+  private final FunctionImplementationRegistry functionRegistry;
+<<<<<<< HEAD
+
+=======
+  private final DistributedGlobalOptions globalDrillOptions;
+  
+>>>>>>> Drill 381 - implementation of session and global options.
+  public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
+    super();
+    Preconditions.checkNotNull(endpoint);
+    Preconditions.checkNotNull(context);
+    Preconditions.checkNotNull(controller);
+    Preconditions.checkNotNull(connectionsPool);
+    this.workBus = workBus;
+    this.controller = controller;
+    this.context = context;
+    this.coord = coord;
+    this.connectionsPool = connectionsPool;
+    this.cache = cache;
+    this.endpoint = endpoint;
+    this.storagePlugins = new StoragePluginRegistry(this);
+    this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storagePlugins);
+    this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
+    this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+    this.globalDrillOptions = new DistributedGlobalOptions(this.cache);
+  }
+
+<<<<<<< HEAD
+=======
+  
+>>>>>>> Drill 381 - implementation of session and global options.
+  public FunctionImplementationRegistry getFunctionImplementationRegistry() {
+    return functionRegistry;
+  }
+
+  public WorkEventBus getWorkBus(){
+    return workBus;
+  }
+<<<<<<< HEAD
+=======
+  
+  public DistributedGlobalOptions getGlobalDrillOptions() {
+    return globalDrillOptions;
+  }
+>>>>>>> Drill 381 - implementation of session and global options.
+
+  public DrillbitEndpoint getEndpoint(){
+    return endpoint;
+  }
+
+  public DrillConfig getConfig() {
+    return context.getConfig();
+  }
+
+  public Collection<DrillbitEndpoint> getBits(){
+    return coord.getAvailableEndpoints();
+  }
+
+  public BufferAllocator getAllocator(){
+    return context.getAllocator();
+  }
+
+  public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+    return operatorCreatorRegistry;
+  }
+
+  public StoragePluginRegistry getStorage(){
+    return this.storagePlugins;
+  }
+
+  public NioEventLoopGroup getBitLoopGroup(){
+    return context.getBitLoopGroup();
+  }
+
+
+  public DataConnectionCreator getDataConnectionsPool(){
+    return connectionsPool;
+  }
+
+  public Controller getController(){
+    return controller;
+  }
+
+  public MetricRegistry getMetrics(){
+    return context.getMetrics();
+  }
+
+  public DistributedCache getCache(){
+    return cache;
+  }
+
+  public PhysicalPlanReader getPlanReader(){
+    return reader;
+  }
+
+  public DrillSchemaFactory getSchemaFactory(){
+    return storagePlugins.getSchemaFactory();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
new file mode 100644
index 0000000..e4b03d3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+
+import com.typesafe.config.ConfigValue;
+
+public class DrillConfigIterator implements Iterable<OptionValue> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfigIterator.class);
+
+  DrillConfig c;
+  public DrillConfigIterator(DrillConfig c){
+    this.c = c;
+  }
+
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return new Iter(c);
+  }
+
+  public class Iter implements Iterator<OptionValue>{
+
+    Iterator<Entry<String, ConfigValue>> entries;
+    public Iter(DrillConfig c){
+      entries = c.entrySet().iterator();
+    }
+    @Override
+    public boolean hasNext() {
+      return entries.hasNext();
+    }
+
+    @Override
+    public OptionValue next() {
+      Entry<String, ConfigValue> e = entries.next();
+      OptionValue v = new OptionValue();
+      v.name = e.getKey();
+      ConfigValue cv = e.getValue();
+      v.type = OptionType.BOOT;
+      switch(cv.valueType()){
+      case BOOLEAN:
+        v.kind = Kind.BOOLEAN;
+        v.bool_val = (Boolean) cv.unwrapped();
+        break;
+      case LIST:
+      case OBJECT:
+      case STRING:
+        v.string_val = cv.render();
+        break;
+      case NUMBER:
+        v.kind = Kind.LONG;
+        v.num_val = ((Number)cv.unwrapped()).longValue();
+        break;
+      }
+      return v;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
new file mode 100644
index 0000000..e9620db
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class FragmentOptionsManager implements OptionManager{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionsManager.class);
+
+  ImmutableMap<String, OptionValue> options;
+  OptionManager systemOptions;
+
+  public FragmentOptionsManager(OptionManager systemOptions, OptionList options){
+    Map<String, OptionValue> tmp = Maps.newHashMap();
+    for(OptionValue v : options){
+      tmp.put(v.name, v);
+    }
+    this.options = ImmutableMap.copyOf(tmp);
+    this.systemOptions = systemOptions;
+  }
+
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return Iterables.concat(systemOptions, options.values()).iterator();
+  }
+
+  @Override
+  public OptionValue getOption(String name) {
+    return null;
+  }
+
+  @Override
+  public void setOption(OptionValue value) throws SetOptionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setOption(String name, SqlLiteral literal) throws SetOptionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionAdmin getAdmin() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionManager getSystemManager() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionList getSessionOptionList() {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
new file mode 100644
index 0000000..b6d4f8c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.ArrayList;
+
+public class OptionList extends ArrayList<OptionValue>{
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
new file mode 100644
index 0000000..3833833
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.eigenbase.sql.SqlLiteral;
+
+public interface OptionManager extends Iterable<OptionValue>{
+  public OptionValue getOption(String name);
+  public void setOption(OptionValue value) throws SetOptionException;
+  public void setOption(String name, SqlLiteral literal) throws SetOptionException;
+  public OptionAdmin getAdmin();
+  public OptionManager getSystemManager();
+  public OptionList getSessionOptionList();
+
+  public interface OptionAdmin{
+    public void registerOptionType(OptionValidator validator);
+    public void validate(OptionValue v) throws SetOptionException;
+    public OptionValue validate(String name, SqlLiteral value) throws SetOptionException;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
new file mode 100644
index 0000000..5b90ba5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server.options;
+
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.eigenbase.sql.SqlLiteral;
+
+/**
+ * Validates the values provided to Drill options.
+ *
+ * @param <E>
+ */
+public abstract class OptionValidator {
+
+  // Stored here as well as in the option static class to allow insertion of option optionName into
+  // the error messages produced by the validator
+  private String optionName;
+
+  public OptionValidator(String optionName){
+    this.optionName = optionName;
+  }
+
+  /**
+   * This method determines if a given value is a valid setting for an option. For options that support some
+   * ambiguity in their settings, such as case-insensitivity for string options, this method returns a modified
+   * version of the passed value that is considered the standard format of the option that should be used for
+   * system-internal representation.
+   *
+   * @param value - the value to validate
+   * @return - the value requested, in its standard format to be used for representing the value within Drill
+   *            Example: all lower case values for strings, to avoid ambiguities in how values are stored
+   *            while allowing some flexibility for users
+   * @throws ExpressionParsingException - message to describe error with value, including range or list of expected values
+   */
+  public abstract OptionValue validate(SqlLiteral value) throws ExpressionParsingException;
+
+  public String getOptionName() {
+    return optionName;
+  }
+
+  public String getDefaultString(){
+    return null;
+  }
+
+
+  public abstract OptionValue getDefault();
+
+  public abstract void validate(OptionValue v) throws ExpressionParsingException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
new file mode 100644
index 0000000..7b4f7f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.apache.drill.exec.cache.JacksonSerializable;
+
+import com.google.common.base.Preconditions;
+
+
+public class OptionValue extends JacksonSerializable {
+
+  public static enum OptionType {
+    BOOT, SYSTEM, SESSION
+  }
+
+  public static enum Kind {
+    BOOLEAN, LONG, STRING, DOUBLE
+  }
+
+  public String name;
+  public Kind kind;
+  public OptionType type;
+  public Long num_val;
+  public String string_val;
+  public Boolean bool_val;
+  public Double float_val;
+
+  public static OptionValue createLong(OptionType type, String name, long val) {
+    return new OptionValue(Kind.LONG, type, name, val, null, null, null);
+  }
+
+  public static OptionValue createBoolean(OptionType type, String name, boolean bool) {
+    return new OptionValue(Kind.BOOLEAN, type, name, null, null, bool, null);
+  }
+
+  public static OptionValue createString(OptionType type, String name, String val) {
+    return new OptionValue(Kind.STRING, type, name, null, val, null, null);
+  }
+
+  public static OptionValue createDouble(OptionType type, String name, double val) {
+    return new OptionValue(Kind.DOUBLE, type, name, null, null, null, val);
+  }
+
+  public OptionValue(){}
+
+  private OptionValue(Kind kind, OptionType type, String name, Long num_val, String string_val, Boolean bool_val, Double float_val) {
+    super();
+    Preconditions.checkArgument(num_val != null || string_val != null || bool_val != null);
+    this.name = name;
+    this.kind = kind;
+    this.float_val = float_val;
+    this.type = type;
+    this.num_val = num_val;
+    this.string_val = string_val;
+    this.bool_val = bool_val;
+    this.type = type;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((bool_val == null) ? 0 : bool_val.hashCode());
+    result = prime * result + ((float_val == null) ? 0 : float_val.hashCode());
+    result = prime * result + ((kind == null) ? 0 : kind.hashCode());
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((num_val == null) ? 0 : num_val.hashCode());
+    result = prime * result + ((string_val == null) ? 0 : string_val.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    OptionValue other = (OptionValue) obj;
+    if (bool_val == null) {
+      if (other.bool_val != null)
+        return false;
+    } else if (!bool_val.equals(other.bool_val))
+      return false;
+    if (float_val == null) {
+      if (other.float_val != null)
+        return false;
+    } else if (!float_val.equals(other.float_val))
+      return false;
+    if (kind != other.kind)
+      return false;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (num_val == null) {
+      if (other.num_val != null)
+        return false;
+    } else if (!num_val.equals(other.num_val))
+      return false;
+    if (string_val == null) {
+      if (other.string_val != null)
+        return false;
+    } else if (!string_val.equals(other.string_val))
+      return false;
+    if (type != other.type)
+      return false;
+    return true;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
new file mode 100644
index 0000000..993cead
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class SessionOptionManager implements OptionManager{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
+
+  private Map<String, OptionValue> options = Maps.newConcurrentMap();
+  private OptionManager systemOptions;
+
+  public SessionOptionManager(OptionManager systemOptions) {
+    super();
+    this.systemOptions = systemOptions;
+  }
+
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return Iterables.concat(systemOptions, options.values()).iterator();
+  }
+
+  @Override
+  public OptionValue getOption(String name) {
+    OptionValue opt = options.get(name);
+    if(opt == null){
+      return systemOptions.getOption(name);
+    }else{
+      return opt;
+    }
+  }
+
+  @Override
+  public void setOption(OptionValue value) {
+    systemOptions.getAdmin().validate(value);
+    setValidatedOption(value);
+  }
+
+
+  @Override
+  public OptionList getSessionOptionList() {
+    OptionList list = new OptionList();
+    for(OptionValue o : options.values()){
+      list.add(o);
+    }
+    return list;
+  }
+
+  private void setValidatedOption(OptionValue value){
+    if(value.type == OptionType.SYSTEM){
+      systemOptions.setOption(value);
+    }else{
+      options.put(value.name, value);
+    }
+  }
+
+  @Override
+  public void setOption(String name, SqlLiteral literal) {
+    OptionValue val = systemOptions.getAdmin().validate(name,  literal);
+    val.type = OptionValue.OptionType.SESSION;
+    setValidatedOption(val);
+  }
+
+  @Override
+  public OptionAdmin getAdmin() {
+    return systemOptions.getAdmin();
+  }
+
+  @Override
+  public OptionManager getSystemManager() {
+    return systemOptions;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
new file mode 100644
index 0000000..dd698c3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Set;
+
+import javax.validation.ConstraintViolation;
+
+import org.apache.drill.common.exceptions.LogicalPlanParsingException;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalOperatorBase;
+
+public class SetOptionException extends LogicalPlanParsingException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionException.class);
+
+  public SetOptionException() {
+    super();
+
+  }
+
+  public SetOptionException(LogicalOperator operator, Set<ConstraintViolation<LogicalOperatorBase>> violations) {
+    super(operator, violations);
+
+  }
+
+  public SetOptionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+
+  }
+
+  public SetOptionException(String message, Throwable cause) {
+    super(message, cause);
+
+  }
+
+  public SetOptionException(String message) {
+    super(message);
+
+  }
+
+  public SetOptionException(Throwable cause) {
+    super(cause);
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
new file mode 100644
index 0000000..98975e4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Maps;
+
+public class SystemOptionManager implements OptionManager{
+
+  private final OptionValidator[] VALIDATORS = {
+      PlannerSettings.EXCHANGE
+  };
+
+  private DistributedMap<OptionValue> options;
+  private SystemOptionAdmin admin;
+  private final ConcurrentMap<String, OptionValidator> knownOptions = Maps.newConcurrentMap();
+  private DistributedCache cache;
+
+  public SystemOptionManager(DistributedCache cache){
+    this.cache = cache;
+  }
+
+  public void init(){
+    this.options = cache.getNamedMap("system.options", OptionValue.class);
+    this.admin = new SystemOptionAdmin();
+  }
+
+  private class Iter implements Iterator<OptionValue>{
+    private Iterator<Map.Entry<String, OptionValue>> inner;
+
+    public Iter(Iterator<Map.Entry<String, OptionValue>> inner){
+      this.inner = inner;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return inner.hasNext();
+    }
+
+    @Override
+    public OptionValue next() {
+      return inner.next().getValue();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return new Iter(options.iterator());
+  }
+
+  @Override
+  public OptionValue getOption(String name) {
+    return options.get(name);
+  }
+
+  @Override
+  public void setOption(OptionValue value) {
+    admin.validate(value);
+    assert value.type == OptionType.SYSTEM;
+    options.put(value.name, value);
+  }
+
+  @Override
+  public void setOption(String name, SqlLiteral literal) {
+    OptionValue v = admin.validate(name, literal);
+    v.type = OptionValue.OptionType.SYSTEM;
+    options.put(name, v);
+  }
+
+  @Override
+  public OptionList getSessionOptionList() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionManager getSystemManager() {
+    return this;
+  }
+
+  @Override
+  public OptionAdmin getAdmin() {
+    return admin;
+  }
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class);
+
+
+  private class SystemOptionAdmin implements OptionAdmin{
+
+    public SystemOptionAdmin(){
+      for(OptionValidator v : VALIDATORS){
+        knownOptions.put(v.getOptionName(), v);
+        options.putIfAbsent(v.getOptionName(), v.getDefault());
+      }
+    }
+
+
+    @Override
+    public void registerOptionType(OptionValidator validator) {
+      if(null != knownOptions.putIfAbsent(validator.getOptionName(), validator) ){
+        throw new IllegalArgumentException("Only one option is allowed to be registered with name: " + validator.getOptionName());
+      }
+    }
+
+    @Override
+    public void validate(OptionValue v) throws SetOptionException {
+      OptionValidator validator = knownOptions.get(v.name);
+      if(validator == null) throw new SetOptionException("Unknown option " + v.name);
+      validator.validate(v);
+    }
+
+    @Override
+    public OptionValue validate(String name, SqlLiteral value) throws SetOptionException {
+      OptionValidator validator = knownOptions.get(name);
+      if(validator == null) throw new SetOptionException("Unknown option " + name);
+      return validator.validate(value);
+    }
+
+
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
new file mode 100644
index 0000000..0d681ea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+public class TypeValidators {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeValidators.class);
+
+  public static class PositiveLongValidator extends LongValidator {
+
+    private final long max;
+
+    public PositiveLongValidator(String name, long max, long def) {
+      super(name, def);
+      this.max = max;
+    }
+
+    @Override
+    public void extraValidate(OptionValue v) throws ExpressionParsingException {
+      if (v.num_val > max || v.num_val < 0)
+        throw new ExpressionParsingException(String.format("Option %s must be between %d and %d.", getOptionName(), 0,
+            max));
+    }
+  }
+
+  public static class BooleanValidator extends TypeValidator{
+    public BooleanValidator(String name, boolean def){
+      super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def));
+    }
+  }
+  public static class StringValidator extends TypeValidator{
+    public StringValidator(String name, String def){
+      super(name, Kind.LONG, OptionValue.createString(OptionType.SYSTEM, name, def));
+    }
+
+  }
+  public static class LongValidator extends TypeValidator{
+    public LongValidator(String name, long def){
+      super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def));
+    }
+  }
+  public static class DoubleValidator extends TypeValidator{
+
+    public DoubleValidator(String name, double def){
+      super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def));
+    }
+
+
+  }
+
+  public static abstract class TypeValidator extends OptionValidator {
+    final Kind kind;
+    private OptionValue defaultValue;
+
+    public TypeValidator(String name, Kind kind, OptionValue defValue) {
+      super(name);
+      this.kind = kind;
+      this.defaultValue = defValue;
+    }
+
+    @Override
+    public OptionValue getDefault() {
+      return defaultValue;
+    }
+
+    @Override
+    public OptionValue validate(SqlLiteral value) throws ExpressionParsingException {
+      OptionValue op = getPartialValue(getOptionName(), (OptionType) null, value);
+      validate(op);
+      return op;
+    }
+
+    @Override
+    public final void validate(OptionValue v) throws ExpressionParsingException {
+      if (v.kind != kind)
+        throw new ExpressionParsingException(String.format("Option %s must be of type %s but you tried to set to %s.",
+            getOptionName(), kind.name(), v.kind.name()));
+    }
+
+    public void extraValidate(OptionValue v) throws ExpressionParsingException {
+    }
+
+  }
+
+  public static OptionValue getPartialValue(String name, OptionType type, SqlLiteral literal) {
+    switch (literal.getTypeName()) {
+    case DECIMAL:
+    case DOUBLE:
+    case FLOAT:
+      return OptionValue.createDouble(type, name, ((BigDecimal) literal.getValue()).doubleValue());
+
+    case SMALLINT:
+    case TINYINT:
+    case BIGINT:
+    case INTEGER:
+      return OptionValue.createLong(type, name, ((BigDecimal) literal.getValue()).longValue());
+
+    case VARBINARY:
+    case VARCHAR:
+    case CHAR:
+      return OptionValue.createString(type, name, (String) literal.getValue());
+
+    case BOOLEAN:
+      return OptionValue.createBoolean(type, name, (Boolean) literal.getValue());
+
+    }
+
+    throw new ExpressionParsingException(String.format(
+        "Drill doesn't support set option expressions with literals of type %s.", literal.getTypeName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 4d88686..948c74f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import com.google.common.base.Preconditions;
 import net.hydromatic.linq4j.expressions.DefaultExpression;
 import net.hydromatic.linq4j.expressions.Expression;
 import net.hydromatic.optiq.SchemaPlus;
@@ -40,10 +39,9 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.util.PathScanner;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.rpc.user.DrillUser;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -51,9 +49,12 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
 import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
 import org.eigenbase.relopt.RelOptRule;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
@@ -91,9 +92,9 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
       int i =0;
       for(Constructor<?> c : plugin.getConstructors()){
         Class<?>[] params = c.getParameterTypes();
-        if(params.length != 3 
-            || params[1] != DrillbitContext.class 
-            || !StoragePluginConfig.class.isAssignableFrom(params[0]) 
+        if(params.length != 3
+            || params[1] != DrillbitContext.class
+            || !StoragePluginConfig.class.isAssignableFrom(params[0])
             || params[2] != String.class){
           logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
           continue;
@@ -109,7 +110,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
     // create registered plugins defined in "storage-plugins.json"
     this.plugins = ImmutableMap.copyOf(createPlugins());
 
-    // query registered engines for optimizer rules and build the storage plugin RuleSet 
+    // query registered engines for optimizer rules and build the storage plugin RuleSet
     Builder<RelOptRule> setBuilder = ImmutableSet.builder();
     for (StoragePlugin plugin : this.plugins.values()) {
       Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
@@ -135,17 +136,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
         String pluginsData = Resources.toString(url, Charsets.UTF_8);
         plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
       }
-      DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
-      StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
-      if (cachedPluginsSerializable != null) {
-        cachedPlugins = cachedPluginsSerializable.getObj();
+      DistributedMap<StoragePlugins> map = context.getCache().getMap(StoragePlugins.class);
+      cachedPlugins = map.get("storage-plugins");
+      if (cachedPlugins != null) {
         logger.debug("Found cached storage plugin config: {}", cachedPlugins);
       } else {
         Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
         logger.debug("caching storage plugin config {}", plugins);
-        map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
-        cachedPluginsSerializable = map.get("storage-plugins");
-        cachedPlugins = cachedPluginsSerializable.getObj();
+        map.put("storage-plugins", plugins);
+        cachedPlugins = map.get("storage-plugins");
       }
       if(!(plugins == null || cachedPlugins.equals(plugins))) {
         logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
@@ -155,7 +154,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
     }catch(IOException e){
       throw new IllegalStateException("Failure while reading storage plugins data.", e);
     }
-    
+
     for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
       try{
         StoragePlugin plugin = create(config.getKey(), config.getValue());
@@ -165,14 +164,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
       }
     }
     activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
-    
+    activePlugins.put("sys", new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, "sys"));
+
     return activePlugins;
   }
 
   public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
     return plugins.get(registeredStoragePluginName);
   }
-  
+
   public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
     if(config instanceof NamedStoragePluginConfig){
       return plugins.get(((NamedStoragePluginConfig) config).name);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
new file mode 100644
index 0000000..2c75651
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import net.hydromatic.linq4j.expressions.DefaultExpression;
+import net.hydromatic.linq4j.expressions.Expression;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.RuleSet;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.eigenbase.relopt.RelOptRule;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.io.Resources;
+import org.apache.drill.exec.store.options.OptionValueStorageConfig;
+import org.apache.drill.exec.store.options.OptionValueStoragePlugin;
+
+
+public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
+
+  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+  private ImmutableMap<String, StoragePlugin> plugins;
+
+  private DrillbitContext context;
+  private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
+
+  private RuleSet storagePluginsRuleSet;
+
+  private static final Expression EXPRESSION = new DefaultExpression(Object.class);
+
+  public StoragePluginRegistry(DrillbitContext context) {
+    try{
+    this.context = context;
+    }catch(RuntimeException e){
+      logger.error("Failure while loading storage plugin registry.", e);
+      throw new RuntimeException("Faiure while reading and loading storage plugin configuration.", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void init() throws DrillbitStartupException {
+    DrillConfig config = context.getConfig();
+    Collection<Class<? extends StoragePlugin>> plugins = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    logger.debug("Loading storage plugins {}", plugins);
+    for(Class<? extends StoragePlugin> plugin: plugins){
+      int i =0;
+      for(Constructor<?> c : plugin.getConstructors()){
+        Class<?>[] params = c.getParameterTypes();
+        if(params.length != 3 
+            || params[1] != DrillbitContext.class 
+            || !StoragePluginConfig.class.isAssignableFrom(params[0]) 
+            || params[2] != String.class){
+          logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+          continue;
+        }
+        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+        i++;
+      }
+      if(i == 0){
+        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+      }
+    }
+
+    // create registered plugins defined in "storage-plugins.json"
+    this.plugins = ImmutableMap.copyOf(createPlugins());
+
+    // query registered engines for optimizer rules and build the storage plugin RuleSet 
+    Builder<RelOptRule> setBuilder = ImmutableSet.builder();
+    for (StoragePlugin plugin : this.plugins.values()) {
+      Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
+      if (rules != null && rules.size() > 0) {
+        setBuilder.addAll(rules);
+      }
+    }
+    this.storagePluginsRuleSet = DrillRuleSets.create(setBuilder.build());
+  }
+
+  private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+    /*
+     * Check if "storage-plugins.json" exists. Also check if "storage-plugins" object exists in Distributed Cache.
+     * If both exist, check that they are the same. If they differ, throw exception. If "storage-plugins.json" exists, but
+     * nothing found in cache, then add it to the cache. If neither are found, throw exception.
+     */
+    StoragePlugins plugins = null;
+    StoragePlugins cachedPlugins = null;
+    Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
+    try{
+      URL url = Resources.class.getClassLoader().getResource("storage-plugins.json");
+      if (url != null) {
+        String pluginsData = Resources.toString(url, Charsets.UTF_8);
+        plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
+      }
+      DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
+      StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
+      if (cachedPluginsSerializable != null) {
+        cachedPlugins = cachedPluginsSerializable.getObj();
+        logger.debug("Found cached storage plugin config: {}", cachedPlugins);
+      } else {
+        Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
+        logger.debug("caching storage plugin config {}", plugins);
+        map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
+        cachedPluginsSerializable = map.get("storage-plugins");
+        cachedPlugins = cachedPluginsSerializable.getObj();
+      }
+      if(!(plugins == null || cachedPlugins.equals(plugins))) {
+        logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
+        throw new DrillbitStartupException("Storage plugin config mismatch");
+      }
+      logger.debug("using plugin config: {}", cachedPlugins);
+    }catch(IOException e){
+      throw new IllegalStateException("Failure while reading storage plugins data.", e);
+    }
+    
+    for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
+      try{
+        StoragePlugin plugin = create(config.getKey(), config.getValue());
+        activePlugins.put(config.getKey(), plugin);
+      }catch(ExecutionSetupException e){
+        logger.error("Failure while setting up StoragePlugin with name: '{}'.", config.getKey(), e);
+      }
+    }
+<<<<<<< HEAD
+    activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+    
+    return activePlugins;
+=======
+    activeEngines.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+    activeEngines.put("OPTIONS", new OptionValueStoragePlugin(new OptionValueStorageConfig(), context, "OPTIONS"));
+
+    return activeEngines;
+>>>>>>> Drill 381 - implementation of session and global options.
+  }
+
+  public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
+    return plugins.get(registeredStoragePluginName);
+  }
+  
+  public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+    if(config instanceof NamedStoragePluginConfig){
+      return plugins.get(((NamedStoragePluginConfig) config).name);
+    }else{
+      // TODO: for now, we'll throw away transient configs.  we really ought to clean these up.
+      return create(null, config);
+    }
+  }
+
+  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException{
+    StoragePlugin p = getPlugin(storageConfig);
+    if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin.  The actual type of plugin was %s.", p.getClass().getName()));
+    FileSystemPlugin storage = (FileSystemPlugin) p;
+    return storage.getFormatPlugin(formatConfig);
+  }
+
+  private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+    StoragePlugin plugin = null;
+    Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
+    if (c == null)
+      throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+          pluginConfig));
+    try {
+      plugin = c.newInstance(pluginConfig, context, name);
+      return plugin;
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
+      if (t instanceof ExecutionSetupException)
+        throw ((ExecutionSetupException) t);
+      throw new ExecutionSetupException(String.format(
+          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+    }
+  }
+
+  @Override
+  public Iterator<Entry<String, StoragePlugin>> iterator() {
+    return plugins.entrySet().iterator();
+  }
+
+  public RuleSet getStoragePluginRuleSet() {
+    return storagePluginsRuleSet;
+  }
+
+  public DrillSchemaFactory getSchemaFactory(){
+    return schemaFactory;
+  }
+
+  public class DrillSchemaFactory implements SchemaFactory{
+
+    @Override
+    public void registerSchemas(DrillUser user, SchemaPlus parent) {
+      for(Map.Entry<String, StoragePlugin> e : plugins.entrySet()){
+        e.getValue().registerSchemas(user, parent);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 256b6b6..eed4f03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
@@ -83,4 +84,10 @@ public class DirectGroupScan extends AbstractGroupScan{
   public String getDigest() {
     return String.valueOf(reader);
   }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index 8ae5116..84b6690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
@@ -74,13 +76,13 @@ public class MockGroupScanPOP extends AbstractGroupScan {
   public List<MockScanEntry> getReadEntries() {
     return readEntries;
   }
-  
+
   public static class MockScanEntry{
 
     private final int records;
     private final MockColumn[] types;
     private final int recordSize;
-    
+
 
     @JsonCreator
     public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
@@ -97,7 +99,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public OperatorCost getCost() {
       return new OperatorCost(1, 2, 1, 1);
     }
-    
+
     public int getRecords() {
       return records;
     }
@@ -116,7 +118,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
       return "MockScanEntry [records=" + records + ", columns=" + Arrays.toString(types) + "]";
     }
   }
-  
+
   @JsonInclude(Include.NON_NULL)
   public static class MockColumn{
     @JsonProperty("type") public MinorType minorType;
@@ -125,8 +127,8 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public Integer width;
     public Integer precision;
     public Integer scale;
-    
-    
+
+
     @JsonCreator
     public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
       this.name = name;
@@ -136,7 +138,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
       this.precision = precision;
       this.scale = scale;
     }
-    
+
     @JsonProperty("type")
     public MinorType getMinorType() {
       return minorType;
@@ -156,7 +158,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public Integer getScale() {
       return scale;
     }
-    
+
     @JsonIgnore
     public MajorType getMajorType(){
       MajorType.Builder b = MajorType.newBuilder();
@@ -172,7 +174,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public String toString() {
       return "MockColumn [minorType=" + minorType + ", name=" + name + ", mode=" + mode + "]";
     }
-    
+
   }
 
   @Override
@@ -184,7 +186,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
   @Override
   public void applyAssignments(List<DrillbitEndpoint> endpoints) {
     Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-    
+
     mappings = new LinkedList[endpoints.size()];
 
     int i =0;
@@ -230,6 +232,11 @@ public class MockGroupScanPOP extends AbstractGroupScan {
   }
 
   @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
+  @Override
   public String getDigest() {
     return toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
index f0d4fb6..0c9a1cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
@@ -48,7 +48,7 @@ abstract class AbstractWriter<V extends ValueVector> implements PojoWriter{
 
   @Override
   public void allocate() {
-    AllocationHelper.allocate(vector, 500, 100);
+    vector.allocateNew();
   }
 
   public void setValueCount(int valueCount){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
new file mode 100644
index 0000000..8ecb29f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pojo;
+
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import com.google.common.collect.Lists;
+
+public class PojoDataType {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
+
+  public List<SqlTypeName> types = Lists.newArrayList();
+  public List<String> names = Lists.newArrayList();
+
+  public PojoDataType(Class<?> pojoClass){
+    logger.debug(pojoClass.getName());
+    Field[] fields = pojoClass.getDeclaredFields();
+    for(int i = 0; i < fields.length; i++){
+      Field f = fields[i];
+
+      Class<?> type = f.getType();
+      names.add(f.getName());
+
+      if(type == int.class || type == Integer.class){
+        types.add(SqlTypeName.INTEGER);
+      }else if(type == boolean.class || type == Boolean.class){
+        types.add(SqlTypeName.BOOLEAN);
+      }else if(type == long.class || type == Long.class){
+        types.add(SqlTypeName.BIGINT);
+      }else if(type == double.class || type == Double.class){
+        types.add(SqlTypeName.DOUBLE);
+      }else if(type == String.class){
+        types.add(SqlTypeName.VARCHAR);
+      }else if(type.isEnum()){
+        types.add(SqlTypeName.VARCHAR);
+      }else{
+        throw new RuntimeException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type));
+      }
+    }
+  }
+
+
+  public RelDataType getRowType(RelDataTypeFactory f){
+    List<RelDataType> fields = Lists.newArrayList();
+    for(SqlTypeName n : types){
+      fields.add(f.createSqlType(n));
+    }
+    return f.createStructType(fields, names);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 8dac455..4203abc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -25,8 +25,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.pojo.Writers.BitWriter;
+import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.EnumWriter;
 import org.apache.drill.exec.store.pojo.Writers.IntWriter;
 import org.apache.drill.exec.store.pojo.Writers.LongWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBigIntWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter;
+import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
 import org.apache.drill.exec.store.pojo.Writers.StringWriter;
 
 public class PojoRecordReader<T> implements RecordReader{
@@ -48,7 +54,7 @@ public class PojoRecordReader<T> implements RecordReader{
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     try{
-      Field[] fields = pojoClass.getFields();
+      Field[] fields = pojoClass.getDeclaredFields();
       writers = new PojoWriter[fields.length];
       for(int i = 0; i < writers.length; i++){
         Field f = fields[i];
@@ -56,6 +62,18 @@ public class PojoRecordReader<T> implements RecordReader{
 
         if(type == int.class){
           writers[i] = new IntWriter(f);
+        }else if(type == Integer.class){
+          writers[i] = new NIntWriter(f);
+        }else if(type == Long.class){
+          writers[i] = new NBigIntWriter(f);
+        }else if(type == Boolean.class){
+          writers[i] = new NBooleanWriter(f);
+        }else if(type == double.class){
+          writers[i] = new DoubleWriter(f);
+        }else if(type == Double.class){
+          writers[i] = new NDoubleWriter(f);
+        }else if(type.isEnum()){
+          writers[i] = new EnumWriter(f);
         }else if(type == boolean.class){
           writers[i] = new BitWriter(f);
         }else if(type == long.class){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
index 6910903..b986be8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
@@ -27,7 +27,12 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 
 import com.google.common.base.Charsets;
@@ -81,18 +86,32 @@ public class Writers {
 
   }
 
-  public static class StringWriter extends AbstractWriter<NullableVarCharVector>{
+  public static class DoubleWriter extends AbstractWriter<Float8Vector>{
 
+    public DoubleWriter(Field field) {
+      super(field, Types.required(MinorType.FLOAT8));
+      if(field.getType() != double.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      double d = field.getDouble(pojo);
+
+      return vector.getMutator().setSafe(outboundIndex, d);
+    }
+
+  }
+
+  private abstract static class AbstractStringWriter extends AbstractWriter<NullableVarCharVector>{
     private ByteBuf data;
     private final NullableVarCharHolder h = new NullableVarCharHolder();
 
-    public StringWriter(Field field) {
+    public AbstractStringWriter(Field field) {
       super(field, Types.optional(MinorType.VARCHAR));
-      if(field.getType() != String.class) throw new IllegalStateException();
       ensureLength(100);
     }
 
-    private void ensureLength(int len){
+    void ensureLength(int len){
       if(data == null || data.capacity() < len){
         if(data != null) data.release();
         data = UnpooledByteBufAllocator.DEFAULT.buffer(len);
@@ -103,11 +122,9 @@ public class Writers {
       data.release();
     }
 
-    @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
-      String s = (String) field.get(pojo);
+    public boolean writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       if(s == null){
-        h.isSet = 0;
+        return true;
       }else{
         h.isSet = 1;
         byte[] bytes = s.getBytes(Charsets.UTF_8);
@@ -117,10 +134,108 @@ public class Writers {
         h.buffer = data;
         h.start = 0;
         h.end = bytes.length;
+        return vector.getMutator().setSafe(outboundIndex, h);
+
+      }
+
+    }
+
+  }
+
+  public static class EnumWriter extends AbstractStringWriter{
+    public EnumWriter(Field field) {
+      super(field);
+      if(!field.getType().isEnum()) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Enum<?> e= ((Enum<?>) field.get(pojo));
+      if(e == null) return true;
+      return writeString(e.name(), outboundIndex);
+    }
+  }
+
+  public static class StringWriter extends AbstractStringWriter {
+    public StringWriter(Field field) {
+      super(field);
+      if(field.getType() != String.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      String s = (String) field.get(pojo);
+      return writeString(s, outboundIndex);
+    }
+  }
+
+  public static class NIntWriter extends AbstractWriter<NullableIntVector>{
+
+    public NIntWriter(Field field) {
+      super(field, Types.optional(MinorType.INT));
+      if(field.getType() != Integer.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Integer i = (Integer) field.get(pojo);
+      if(i != null){
+        return vector.getMutator().setSafe(outboundIndex, i);
+      }
+      return true;
+    }
+
+  }
 
+  public static class NBigIntWriter extends AbstractWriter<NullableBigIntVector>{
+
+    public NBigIntWriter(Field field) {
+      super(field, Types.optional(MinorType.BIGINT));
+      if(field.getType() != Long.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Long o = (Long) field.get(pojo);
+      if(o != null){
+        return vector.getMutator().setSafe(outboundIndex, o);
       }
+      return true;
+    }
 
-      return vector.getMutator().setSafe(outboundIndex, h);
+  }
+
+  public static class NBooleanWriter extends AbstractWriter<NullableBitVector>{
+
+    public NBooleanWriter(Field field) {
+      super(field, Types.optional(MinorType.BIT));
+      if(field.getType() != Boolean.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Boolean o = (Boolean) field.get(pojo);
+      if(o != null){
+        return vector.getMutator().setSafe(outboundIndex, o ? 1 : 0);
+      }
+      return true;
+    }
+
+  }
+  public static class NDoubleWriter extends AbstractWriter<NullableFloat8Vector>{
+
+    public NDoubleWriter(Field field) {
+      super(field, Types.optional(MinorType.FLOAT8));
+      if(field.getType() != Double.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Double o = (Double) field.get(pojo);
+      if(o != null){
+        return vector.getMutator().setSafe(outboundIndex, o);
+      }
+      return true;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
new file mode 100644
index 0000000..844fd68
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class DrillbitIterator implements Iterator<Object> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitIterator.class);
+
+  private Iterator<DrillbitEndpoint> endpoints;
+
+  public DrillbitIterator(FragmentContext c) {
+    this.endpoints = c.getDrillbitContext().getBits().iterator();
+  }
+
+  public static class DrillbitInstance {
+    public String host;
+    public int user_port;
+    public int control_port;
+    public int data_port;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return endpoints.hasNext();
+  }
+
+  @Override
+  public Object next() {
+    DrillbitEndpoint ep = endpoints.next();
+    DrillbitInstance i = new DrillbitInstance();
+    i.host = ep.getAddress();
+    i.user_port = ep.getUserPort();
+    i.control_port = ep.getControlPort();
+    i.data_port = ep.getDataPort();
+    return i;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
new file mode 100644
index 0000000..c1e8dd1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public class StaticDrillTable extends DrillTable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
+
+  private final PojoDataType type;
+
+  public StaticDrillTable(PojoDataType type, String storageEngineName, StoragePlugin plugin, Object selection) {
+    super(storageEngineName, plugin, selection);
+    this.type = type;
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return type.getRowType(typeFactory);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
new file mode 100644
index 0000000..4301f12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public enum SystemTable {
+  OPTION("options", OptionValue.class),
+  DRILLBITS("drillbits", DrillbitIterator.DrillbitInstance.class)
+  ;
+
+  private final PojoDataType type;
+  private final String tableName;
+  private final Class<?> pojoClass;
+
+  SystemTable(String tableName, Class<?> clazz){
+    this.type = new PojoDataType(clazz);
+    this.tableName = tableName;
+    this.pojoClass = clazz;
+  }
+
+  public String getTableName(){
+    return tableName;
+  }
+
+  public RelDataType getRowType(RelDataTypeFactory f){
+    return type.getRowType(f);
+  }
+
+  public PojoDataType getType(){
+    return type;
+  }
+
+  public Class<?> getPojoClass(){
+    return pojoClass;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
new file mode 100644
index 0000000..a1bec1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+public class SystemTableBatchCreator implements BatchCreator<SystemTableScan>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Override
+  public RecordBatch getBatch(FragmentContext context, SystemTableScan scan, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Iterator<Object> iter = scan.getPlugin().getRecordIterator(context, scan.getTable());
+    PojoRecordReader reader = new PojoRecordReader(scan.getTable().getPojoClass(), iter);
+
+    return new ScanBatch(scan, context, Collections.singleton( (RecordReader) reader).iterator());
+  }
+}


[4/6] DRILL-381: Implement SYSTEM and SESSION options.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
new file mode 100644
index 0000000..7fb8b6c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.DrillConfigIterator;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Iterables;
+
+public class SystemTablePlugin extends AbstractStoragePlugin{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePlugin.class);
+
+  private final DrillbitContext context;
+  private final String name;
+
+  public SystemTablePlugin(SystemTablePluginConfig configuration, DrillbitContext context, String name){
+    this.context = context;
+    this.name = name;
+  }
+
+  private SystemSchema schema = new SystemSchema();
+
+  @Override
+  public StoragePluginConfig getConfig() {
+    return SystemTablePluginConfig.INSTANCE;
+  }
+
+  @Override
+  public void registerSchemas(DrillUser user, SchemaPlus parent) {
+    parent.add(schema.getName(), schema);
+  }
+
+  public Iterator<Object> getRecordIterator(FragmentContext context, SystemTable table){
+    switch(table){
+    case DRILLBITS:
+      return new DrillbitIterator(context);
+    case OPTION:
+
+      return Iterables.concat((Iterable<Object>)(Object) new DrillConfigIterator(context.getConfig()), //
+          context.getOptions()).iterator();
+    default:
+      throw new UnsupportedOperationException("Unable to create record iterator for table: " + table.getTableName());
+    }
+  }
+
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+    SystemTable table = selection.getWith(context.getConfig(), SystemTable.class);
+    return new SystemTableScan(table, this);
+  }
+
+  private class SystemSchema extends AbstractSchema{
+
+    private Set<String> tableNames;
+
+    public SystemSchema() {
+      super("sys");
+      Set<String> names = Sets.newHashSet();
+      for(SystemTable t : SystemTable.values()){
+        names.add(t.getTableName());
+      }
+      this.tableNames = ImmutableSet.copyOf(names);
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return tableNames;
+    }
+
+
+    @Override
+    public DrillTable getTable(String name) {
+      for(SystemTable table : SystemTable.values()){
+        if(table.getTableName().equalsIgnoreCase(name)){
+          return new StaticDrillTable(table.getType(), SystemTablePlugin.this.name, SystemTablePlugin.this, table);
+        }
+      }
+      return null;
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
new file mode 100644
index 0000000..bca9881
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePluginConfig.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+public class SystemTablePluginConfig implements StoragePluginConfig{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTablePluginConfig.class);
+
+  String name = "system-tables";
+
+  public static SystemTablePluginConfig INSTANCE = new SystemTablePluginConfig();
+
+  private SystemTablePluginConfig(){
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
new file mode 100644
index 0000000..9a745ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("sys")
+public class SystemTableScan extends AbstractGroupScan implements SubScan{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);
+
+  private final SystemTable table;
+  private final SystemTablePlugin plugin;
+
+  @JsonCreator
+  public SystemTableScan( //
+      @JsonProperty("table") SystemTable table, //
+      @JacksonInject StoragePluginRegistry engineRegistry //
+      ) throws IOException, ExecutionSetupException {
+    this.table = table;
+    this.plugin = (SystemTablePlugin) engineRegistry.getPlugin(SystemTablePluginConfig.INSTANCE);
+  }
+
+  public SystemTableScan(SystemTable table, SystemTablePlugin plugin){
+    this.table = table;
+    this.plugin = plugin;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1,1,1,1);
+  }
+
+  @Override
+  public Size getSize() {
+    return new Size(100,1);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    return this;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+    return this;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public long getInitialAllocation() {
+    return initialAllocation;
+  }
+
+  @Override
+  public long getMaxAllocation() {
+    return maxAllocation;
+  }
+
+  @Override
+  public String getDigest() {
+    return "SystemTableScan: " + table.name();
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
+  public SystemTable getTable() {
+    return table;
+  }
+
+  public SystemTablePlugin getPlugin() {
+    return plugin;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 835adad..e7accba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -49,21 +49,21 @@ import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 
 public class ControlHandlerImpl implements ControlMessageHandler {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-  
+
   private final WorkerBee bee;
-  
+
   public ControlHandlerImpl(WorkerBee bee) {
     super();
     this.bee = bee;
   }
 
-  
+
   @Override
   public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
     if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received bit com message of type {}", rpcType);
 
     switch (rpcType) {
-    
+
     case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
       FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
       cancelFragment(handle);
@@ -80,28 +80,28 @@ public class ControlHandlerImpl implements ControlMessageHandler {
         startNewRemoteFragment(fragment);
         return DataRpcConfig.OK;
 
-      } catch (OutOfMemoryException e) {
-        logger.error("Failure while attempting to start remote fragment.", fragment, e);
+      } catch (ExecutionSetupException e) {
+        logger.error("Failure while attempting to start remote fragment.", fragment);
         return new Response(RpcType.ACK, Acks.FAIL);
       }
-      
+
     default:
       throw new RpcException("Not yet supported.");
     }
 
   }
-  
-  
-  
+
+
+
   /* (non-Javadoc)
    * @see org.apache.drill.exec.work.batch.BitComHandler#startNewRemoteFragment(org.apache.drill.exec.proto.ExecProtos.PlanFragment)
    */
   @Override
-  public void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException{
+  public void startNewRemoteFragment(PlanFragment fragment) throws ExecutionSetupException{
     logger.debug("Received remote fragment start instruction", fragment);
     FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, new FunctionImplementationRegistry(bee.getContext().getConfig()));
     ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman());
-    
+
     NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
     try{
       FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
@@ -117,15 +117,15 @@ public class ControlHandlerImpl implements ControlMessageHandler {
       listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e);
     } catch (OutOfMemoryError t) {
       if(t.getMessage().startsWith("Direct buffer")){
-        listener.fail(fragment.getHandle(), "Failure due to error", t);  
+        listener.fail(fragment.getHandle(), "Failure due to error", t);
       }else{
         throw t;
       }
-      
+
     }
-    
+
   }
-  
+
   /* (non-Javadoc)
    * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
    */
@@ -141,10 +141,10 @@ public class ControlHandlerImpl implements ControlMessageHandler {
       FragmentExecutor runner = bee.getFragmentRunner(handle);
       if(runner != null) runner.cancel();
     }
-    
+
     return Acks.OK;
   }
-  
-  
-  
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 95f2dc6..d00478b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
 
 import io.netty.buffer.ByteBuf;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -26,14 +27,13 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
-import org.apache.drill.exec.work.fragment.FragmentManager;
 
 public interface ControlMessageHandler {
 
   public abstract Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
       throws RpcException;
 
-  public abstract void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException;
+  public abstract void startNewRemoteFragment(PlanFragment fragment) throws OutOfMemoryException, ExecutionSetupException;
 
   public abstract Ack cancelFragment(FragmentHandle handle);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6027c44..e73ddde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -21,8 +21,11 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -31,7 +34,9 @@ import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.DistributedMultiMap;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.QueryContext;
@@ -49,6 +54,7 @@ import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
@@ -186,6 +192,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
+
   private void parseAndRunLogicalPlan(String json) {
 
     try {
@@ -260,6 +267,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
   }
 
+
   private void runPhysicalPlan(PhysicalPlan plan) {
 
     if(plan.getProperties().resultMode != ResultMode.EXEC){
@@ -280,7 +288,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     SimpleParallelizer parallelizer = new SimpleParallelizer();
 
     try {
-      QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
+      QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
               context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH),
               context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT));
 
@@ -329,7 +337,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
     if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
-    return new BasicOptimizer(DrillConfig.create(), context).optimize(new BasicOptimizer.BasicOptimizationContext(), plan);
+    return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(new BasicOptimizer.BasicOptimizationContext(context), plan);
   }
 
   public QueryResult getResult(UserClientConnection connection, RequestResults req) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index c8f2021..5cad658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -60,8 +60,8 @@ public class NonRootFragmentManager implements FragmentManager {
       this.context.setBuffers(buffers);
       this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman()));
       this.reader = context.getPlanReader();
-      
-    }catch(IOException e){
+
+    }catch(ExecutionSetupException | IOException e){
       throw new FragmentSetupException("Failure while decoding fragment.", e);
     }
   }
@@ -92,7 +92,7 @@ public class NonRootFragmentManager implements FragmentManager {
         return null;
       }
     }
-    
+
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 2b8779a..4f5e2e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
@@ -74,4 +75,8 @@ public class UserWorker{
   public SchemaFactory getSchemaFactory(){
     return bee.getContext().getSchemaFactory();
   }
+
+  public OptionManager getSystemOptions(){
+    return bee.getContext().getOptionManager();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 948c763..6770ee7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -39,6 +39,9 @@ import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.junit.Rule;
 import org.junit.rules.TestRule;
@@ -51,7 +54,7 @@ import com.google.common.io.Resources;
 public class PlanningBase extends ExecTest{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanningBase.class);
 
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(30000);
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(10000);
 
   @Mocked DrillbitContext dbContext;
   @Mocked QueryContext context;
@@ -68,6 +71,10 @@ public class PlanningBase extends ExecTest{
     final DistributedCache cache = new LocalCache();
     cache.run();
 
+    final SystemOptionManager opt = new SystemOptionManager(cache);
+    opt.init();
+    final OptionManager sess = new SessionOptionManager(opt);
+
     new NonStrictExpectations() {
       {
         dbContext.getMetrics();
@@ -76,6 +83,8 @@ public class PlanningBase extends ExecTest{
         result = new TopLevelAllocator();
         dbContext.getConfig();
         result = config;
+        dbContext.getOptionManager();
+        result = opt;
         dbContext.getCache();
         result = cache;
       }
@@ -88,6 +97,7 @@ public class PlanningBase extends ExecTest{
     registry.getSchemaFactory().registerSchemas(null, root);
 
 
+
     new NonStrictExpectations() {
       {
         context.getNewDefaultSchema();
@@ -97,13 +107,15 @@ public class PlanningBase extends ExecTest{
         context.getFunctionRegistry();
         result = functionRegistry;
         context.getSession();
-        result = new UserSession(null, null);
+        result = new UserSession(null, null, null);
         context.getCurrentEndpoint();
         result = DrillbitEndpoint.getDefaultInstance();
         context.getActiveEndpoints();
         result = ImmutableList.of(DrillbitEndpoint.getDefaultInstance());
         context.getPlannerSettings();
-        result = new PlannerSettings();
+        result = new PlannerSettings(sess);
+        context.getOptions();
+        result = sess;
         context.getConfig();
         result = config;
         context.getCache();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
index 1ccb65c..a459bef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
@@ -20,10 +20,11 @@ package org.apache.drill;
 import org.junit.Ignore;
 import org.junit.Test;
 
+@Ignore // DRILL-648
 public class TestTpchSingleMode extends BaseTestQuery{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchSingleMode.class);
 
-  private static final String SINGLE_MODE = "ALTER SESSION SET NO_EXCHANGES = true;";
+  private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges` = true;";
 
 
   private void testSingleMode(String fileName) throws Exception{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 4b2378d..41b00da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -109,11 +109,11 @@ public class TestOptiqPlans extends ExecTest {
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
     DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus);
-    QueryContext qc = new QueryContext(new UserSession(null, null), QueryId.getDefaultInstance(), bitContext);
+    QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
-    PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc).optimize(
-        new BasicOptimizer.BasicOptimizationContext(), plan);
+    PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
+
 
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
     FragmentContext fctxt = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 37e350e..1b38dce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.junit.Test;
 
@@ -34,17 +35,17 @@ import com.google.common.collect.Lists;
 
 public class TestFragmentChecker extends PopUnitTestBase{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFragmentChecker.class);
-  
-  
+
+
   @Test
   public void checkSimpleExchangePlan() throws Exception{
     print("/physical_double_exchange.json", 2, 3);
 
   }
-  
-  
+
+
   private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws Exception{
-    
+
     System.out.println(String.format("=================Building plan fragments for [%s].  Allowing %d total Drillbits.==================", fragmentFile, bitCount));
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
     Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
@@ -54,27 +55,27 @@ public class TestFragmentChecker extends PopUnitTestBase{
     DrillbitEndpoint localBit = null;
     for(int i =0; i < bitCount; i++){
       DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setControlPort(1234+i).build();
-      if(i ==0) localBit = b1; 
+      if(i ==0) localBit = b1;
       endpoints.add(b1);
     }
-    
-    
-    QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
+
+
+    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
-    
+
     System.out.print(qwu.getRootFragment().getFragmentJson());
-    
-    
+
+
     for(PlanFragment f : qwu.getFragments()){
       System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(), f.getHandle().getMinorFragmentId()));
       System.out.print(f.getFragmentJson());
     }
-    
+
     //assertEquals(exepectedFragmentCount, qwu.getFragments().size());
 
     logger.debug("Planning Set {}", planningSet);
   }
-  
+
   @Test
   public void validateSingleExchangeFragment() throws Exception{
     print("/physical_single_exchange.json", 1, 2);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
new file mode 100644
index 0000000..c522da8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestOptions extends BaseTestQuery{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOptions.class);
+
+  @Test
+  public void testDrillbits() throws Exception{
+    test("select * from sys.drillbits;");
+  }
+
+  @Test
+  public void testOptions() throws Exception{
+    test(
+        "select * from sys.options;" +
+        "ALTER SYSTEM set `planner.disable_exchanges` = true;" +
+        "select * from sys.options;" +
+        "ALTER SESSION set `planner.disable_exchanges` = true;" +
+        "select * from sys.options;"
+        );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/resources/server/options_session_check.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/server/options_session_check.json b/exec/java-exec/src/test/resources/server/options_session_check.json
new file mode 100644
index 0000000..6cb80fd
--- /dev/null
+++ b/exec/java-exec/src/test/resources/server/options_session_check.json
@@ -0,0 +1,20 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"options-reader-group-scan"
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "screen"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/test/resources/server/options_set.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/server/options_set.json b/exec/java-exec/src/test/resources/server/options_set.json
new file mode 100644
index 0000000..dda35fc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/server/options_set.json
@@ -0,0 +1,24 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        },
+        options : {
+            &REPLACED_IN_TEST&
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"options-reader-group-scan"
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "screen"
+        }
+    ]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index f70ddca..8130a33 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -272,6 +272,8 @@ public class TestJdbcQuery extends JdbcTest{
         "TABLE_SCHEMA=hive.default; TABLE_NAME=kv\n" +
         "TABLE_SCHEMA=hive.db1; TABLE_NAME=kv_db1\n" +
         "TABLE_SCHEMA=hive; TABLE_NAME=kv\n" +
+        "TABLE_SCHEMA=sys; TABLE_NAME=drillbits\n" +
+        "TABLE_SCHEMA=sys; TABLE_NAME=options\n" +
         "TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=VIEWS\n" +
         "TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=COLUMNS\n" +
         "TABLE_SCHEMA=INFORMATION_SCHEMA; TABLE_NAME=TABLES\n" +
@@ -322,6 +324,7 @@ public class TestJdbcQuery extends JdbcTest{
         "SCHEMA_NAME=dfs\n" +
         "SCHEMA_NAME=cp.default\n" +
         "SCHEMA_NAME=cp\n" +
+        "SCHEMA_NAME=sys\n" +
         "SCHEMA_NAME=INFORMATION_SCHEMA\n";
 
     JdbcAssert.withNoDefaultSchema().sql("SHOW DATABASES").returns(expected);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index fe37521..a1cf6f8 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -3155,6 +3155,21 @@ public final class BitControl {
      * <code>optional int32 time_zone = 16;</code>
      */
     int getTimeZone();
+
+    // optional string options_json = 17;
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    boolean hasOptionsJson();
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    java.lang.String getOptionsJson();
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    com.google.protobuf.ByteString
+        getOptionsJsonBytes();
   }
   /**
    * Protobuf type {@code exec.bit.control.PlanFragment}
@@ -3309,6 +3324,11 @@ public final class BitControl {
               timeZone_ = input.readInt32();
               break;
             }
+            case 138: {
+              bitField0_ |= 0x00004000;
+              optionsJson_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3648,6 +3668,49 @@ public final class BitControl {
       return timeZone_;
     }
 
+    // optional string options_json = 17;
+    public static final int OPTIONS_JSON_FIELD_NUMBER = 17;
+    private java.lang.Object optionsJson_;
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    public boolean hasOptionsJson() {
+      return ((bitField0_ & 0x00004000) == 0x00004000);
+    }
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    public java.lang.String getOptionsJson() {
+      java.lang.Object ref = optionsJson_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          optionsJson_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string options_json = 17;</code>
+     */
+    public com.google.protobuf.ByteString
+        getOptionsJsonBytes() {
+      java.lang.Object ref = optionsJson_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        optionsJson_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       handle_ = org.apache.drill.exec.proto.ExecProtos.FragmentHandle.getDefaultInstance();
       networkCost_ = 0F;
@@ -3663,6 +3726,7 @@ public final class BitControl {
       queryStartTime_ = 0L;
       credentials_ = org.apache.drill.exec.proto.UserBitShared.UserCredentials.getDefaultInstance();
       timeZone_ = 0;
+      optionsJson_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3718,6 +3782,9 @@ public final class BitControl {
       if (((bitField0_ & 0x00002000) == 0x00002000)) {
         output.writeInt32(16, timeZone_);
       }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        output.writeBytes(17, getOptionsJsonBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -3783,6 +3850,10 @@ public final class BitControl {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(16, timeZone_);
       }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(17, getOptionsJsonBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3947,6 +4018,8 @@ public final class BitControl {
         bitField0_ = (bitField0_ & ~0x00001000);
         timeZone_ = 0;
         bitField0_ = (bitField0_ & ~0x00002000);
+        optionsJson_ = "";
+        bitField0_ = (bitField0_ & ~0x00004000);
         return this;
       }
 
@@ -4047,6 +4120,10 @@ public final class BitControl {
           to_bitField0_ |= 0x00002000;
         }
         result.timeZone_ = timeZone_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00004000;
+        }
+        result.optionsJson_ = optionsJson_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4107,6 +4184,11 @@ public final class BitControl {
         if (other.hasTimeZone()) {
           setTimeZone(other.getTimeZone());
         }
+        if (other.hasOptionsJson()) {
+          bitField0_ |= 0x00004000;
+          optionsJson_ = other.optionsJson_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5021,6 +5103,80 @@ public final class BitControl {
         return this;
       }
 
+      // optional string options_json = 17;
+      private java.lang.Object optionsJson_ = "";
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public boolean hasOptionsJson() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public java.lang.String getOptionsJson() {
+        java.lang.Object ref = optionsJson_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          optionsJson_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public com.google.protobuf.ByteString
+          getOptionsJsonBytes() {
+        java.lang.Object ref = optionsJson_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          optionsJson_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public Builder setOptionsJson(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00004000;
+        optionsJson_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public Builder clearOptionsJson() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        optionsJson_ = getDefaultInstance().getOptionsJson();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string options_json = 17;</code>
+       */
+      public Builder setOptionsJsonBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00004000;
+        optionsJson_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.control.PlanFragment)
     }
 
@@ -5753,7 +5909,7 @@ public final class BitControl {
       "shared.DrillPBError\022\024\n\014running_time\030\t \001(" +
       "\003\"k\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAIT" +
       "ING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHE" +
-      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\"\250\003\n\014PlanF" +
+      "D\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\"\276\003\n\014PlanF" +
       "ragment\022(\n\006handle\030\001 \001(\0132\030.exec.bit.Fragm",
       "entHandle\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_c" +
       "ost\030\005 \001(\002\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_c" +
@@ -5764,16 +5920,17 @@ public final class BitControl {
       "\003:\01020000000\022\034\n\007mem_max\030\r \001(\003:\0132000000000" +
       "0\022\030\n\020query_start_time\030\016 \001(\003\0221\n\013credentia" +
       "ls\030\017 \001(\0132\034.exec.shared.UserCredentials\022\021" +
-      "\n\ttime_zone\030\020 \001(\005\"f\n\017WorkQueueStatus\022(\n\010",
-      "endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024" +
-      "\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(" +
-      "\003*\332\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n" +
-      "\007GOODBYE\020\002\022\033\n\027REQ_INIATILIZE_FRAGMENT\020\003\022" +
-      "\027\n\023REQ_CANCEL_FRAGMENT\020\006\022\027\n\023REQ_FRAGMENT" +
-      "_STATUS\020\007\022\022\n\016REQ_BIT_STATUS\020\010\022\030\n\024RESP_FR" +
-      "AGMENT_HANDLE\020\t\022\030\n\024RESP_FRAGMENT_STATUS\020" +
-      "\n\022\023\n\017RESP_BIT_STATUS\020\013B+\n\033org.apache.dri" +
-      "ll.exec.protoB\nBitControlH\001"
+      "\n\ttime_zone\030\020 \001(\005\022\024\n\014options_json\030\021 \001(\t\"",
+      "f\n\017WorkQueueStatus\022(\n\010endpoint\030\001 \001(\0132\026.e" +
+      "xec.DrillbitEndpoint\022\024\n\014queue_length\030\002 \001" +
+      "(\005\022\023\n\013report_time\030\003 \001(\003*\332\001\n\007RpcType\022\r\n\tH" +
+      "ANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\033\n\027REQ_" +
+      "INIATILIZE_FRAGMENT\020\003\022\027\n\023REQ_CANCEL_FRAG" +
+      "MENT\020\006\022\027\n\023REQ_FRAGMENT_STATUS\020\007\022\022\n\016REQ_B" +
+      "IT_STATUS\020\010\022\030\n\024RESP_FRAGMENT_HANDLE\020\t\022\030\n" +
+      "\024RESP_FRAGMENT_STATUS\020\n\022\023\n\017RESP_BIT_STAT" +
+      "US\020\013B+\n\033org.apache.drill.exec.protoB\nBit" +
+      "ControlH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5803,7 +5960,7 @@ public final class BitControl {
           internal_static_exec_bit_control_PlanFragment_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_control_PlanFragment_descriptor,
-              new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "Assignment", "LeafFragment", "Foreman", "MemInitial", "MemMax", "QueryStartTime", "Credentials", "TimeZone", });
+              new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "Assignment", "LeafFragment", "Foreman", "MemInitial", "MemMax", "QueryStartTime", "Credentials", "TimeZone", "OptionsJson", });
           internal_static_exec_bit_control_WorkQueueStatus_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_exec_bit_control_WorkQueueStatus_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index a738646..77d7e9d 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -77,6 +77,7 @@ message PlanFragment {
   optional int64 query_start_time = 14; // start time of query in milliseconds
   optional exec.shared.UserCredentials credentials = 15;
   optional int32 time_zone = 16;
+  optional string options_json = 17;
 }
 
 message WorkQueueStatus{