You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/07 03:20:59 UTC

[5/6] DRILL-381: Implement SYSTEM and SESSION options.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
new file mode 100644
index 0000000..cdfcf60
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java.orig
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Collection;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
+import org.apache.drill.exec.store.StoragePlugin;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
+
+public class DrillbitContext {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+
+  private final BootStrapContext context;
+
+  private PhysicalPlanReader reader;
+  private final ClusterCoordinator coord;
+  private final DataConnectionCreator connectionsPool;
+  private final DistributedCache cache;
+  private final DrillbitEndpoint endpoint;
+  private final StoragePluginRegistry storagePlugins;
+  private final OperatorCreatorRegistry operatorCreatorRegistry;
+  private final Controller controller;
+  private final WorkEventBus workBus;
+  private final FunctionImplementationRegistry functionRegistry;
+<<<<<<< HEAD
+
+=======
+  private final DistributedGlobalOptions globalDrillOptions;
+  
+>>>>>>> Drill 381 - implementation of session and global options.
+  public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
+    super();
+    Preconditions.checkNotNull(endpoint);
+    Preconditions.checkNotNull(context);
+    Preconditions.checkNotNull(controller);
+    Preconditions.checkNotNull(connectionsPool);
+    this.workBus = workBus;
+    this.controller = controller;
+    this.context = context;
+    this.coord = coord;
+    this.connectionsPool = connectionsPool;
+    this.cache = cache;
+    this.endpoint = endpoint;
+    this.storagePlugins = new StoragePluginRegistry(this);
+    this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storagePlugins);
+    this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
+    this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+    this.globalDrillOptions = new DistributedGlobalOptions(this.cache);
+  }
+
+<<<<<<< HEAD
+=======
+  
+>>>>>>> Drill 381 - implementation of session and global options.
+  public FunctionImplementationRegistry getFunctionImplementationRegistry() {
+    return functionRegistry;
+  }
+
+  public WorkEventBus getWorkBus(){
+    return workBus;
+  }
+<<<<<<< HEAD
+=======
+  
+  public DistributedGlobalOptions getGlobalDrillOptions() {
+    return globalDrillOptions;
+  }
+>>>>>>> Drill 381 - implementation of session and global options.
+
+  public DrillbitEndpoint getEndpoint(){
+    return endpoint;
+  }
+
+  public DrillConfig getConfig() {
+    return context.getConfig();
+  }
+
+  public Collection<DrillbitEndpoint> getBits(){
+    return coord.getAvailableEndpoints();
+  }
+
+  public BufferAllocator getAllocator(){
+    return context.getAllocator();
+  }
+
+  public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+    return operatorCreatorRegistry;
+  }
+
+  public StoragePluginRegistry getStorage(){
+    return this.storagePlugins;
+  }
+
+  public NioEventLoopGroup getBitLoopGroup(){
+    return context.getBitLoopGroup();
+  }
+
+
+  public DataConnectionCreator getDataConnectionsPool(){
+    return connectionsPool;
+  }
+
+  public Controller getController(){
+    return controller;
+  }
+
+  public MetricRegistry getMetrics(){
+    return context.getMetrics();
+  }
+
+  public DistributedCache getCache(){
+    return cache;
+  }
+
+  public PhysicalPlanReader getPlanReader(){
+    return reader;
+  }
+
+  public DrillSchemaFactory getSchemaFactory(){
+    return storagePlugins.getSchemaFactory();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
new file mode 100644
index 0000000..e4b03d3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+
+import com.typesafe.config.ConfigValue;
+
+public class DrillConfigIterator implements Iterable<OptionValue> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfigIterator.class);
+
+  DrillConfig c;
+  public DrillConfigIterator(DrillConfig c){
+    this.c = c;
+  }
+
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return new Iter(c);
+  }
+
+  public class Iter implements Iterator<OptionValue>{
+
+    Iterator<Entry<String, ConfigValue>> entries;
+    public Iter(DrillConfig c){
+      entries = c.entrySet().iterator();
+    }
+    @Override
+    public boolean hasNext() {
+      return entries.hasNext();
+    }
+
+    @Override
+    public OptionValue next() {
+      Entry<String, ConfigValue> e = entries.next();
+      OptionValue v = new OptionValue();
+      v.name = e.getKey();
+      ConfigValue cv = e.getValue();
+      v.type = OptionType.BOOT;
+      switch(cv.valueType()){
+      case BOOLEAN:
+        v.kind = Kind.BOOLEAN;
+        v.bool_val = (Boolean) cv.unwrapped();
+        break;
+      case LIST:
+      case OBJECT:
+      case STRING:
+        v.string_val = cv.render();
+        break;
+      case NUMBER:
+        v.kind = Kind.LONG;
+        v.num_val = ((Number)cv.unwrapped()).longValue();
+        break;
+      }
+      return v;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
new file mode 100644
index 0000000..e9620db
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionsManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class FragmentOptionsManager implements OptionManager{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionsManager.class);
+
+  ImmutableMap<String, OptionValue> options;
+  OptionManager systemOptions;
+
+  public FragmentOptionsManager(OptionManager systemOptions, OptionList options){
+    Map<String, OptionValue> tmp = Maps.newHashMap();
+    for(OptionValue v : options){
+      tmp.put(v.name, v);
+    }
+    this.options = ImmutableMap.copyOf(tmp);
+    this.systemOptions = systemOptions;
+  }
+
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return Iterables.concat(systemOptions, options.values()).iterator();
+  }
+
+  @Override
+  public OptionValue getOption(String name) {
+    return null;
+  }
+
+  @Override
+  public void setOption(OptionValue value) throws SetOptionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setOption(String name, SqlLiteral literal) throws SetOptionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionAdmin getAdmin() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionManager getSystemManager() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionList getSessionOptionList() {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
new file mode 100644
index 0000000..b6d4f8c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.ArrayList;
+
+public class OptionList extends ArrayList<OptionValue>{
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
new file mode 100644
index 0000000..3833833
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.eigenbase.sql.SqlLiteral;
+
+public interface OptionManager extends Iterable<OptionValue>{
+  public OptionValue getOption(String name);
+  public void setOption(OptionValue value) throws SetOptionException;
+  public void setOption(String name, SqlLiteral literal) throws SetOptionException;
+  public OptionAdmin getAdmin();
+  public OptionManager getSystemManager();
+  public OptionList getSessionOptionList();
+
+  public interface OptionAdmin{
+    public void registerOptionType(OptionValidator validator);
+    public void validate(OptionValue v) throws SetOptionException;
+    public OptionValue validate(String name, SqlLiteral value) throws SetOptionException;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
new file mode 100644
index 0000000..5b90ba5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server.options;
+
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.eigenbase.sql.SqlLiteral;
+
+/**
+ * Validates the values provided to Drill options.
+ *
+ * @param <E>
+ */
+public abstract class OptionValidator {
+
+  // Stored here as well as in the option static class to allow insertion of option optionName into
+  // the error messages produced by the validator
+  private String optionName;
+
+  public OptionValidator(String optionName){
+    this.optionName = optionName;
+  }
+
+  /**
+   * This method determines if a given value is a valid setting for an option. For options that support some
+   * ambiguity in their settings, such as case-insensitivity for string options, this method returns a modified
+   * version of the passed value that is considered the standard format of the option that should be used for
+   * system-internal representation.
+   *
+   * @param value - the value to validate
+   * @return - the value requested, in its standard format to be used for representing the value within Drill
+   *            Example: all lower case values for strings, to avoid ambiguities in how values are stored
+   *            while allowing some flexibility for users
+   * @throws ExpressionParsingException - message to describe error with value, including range or list of expected values
+   */
+  public abstract OptionValue validate(SqlLiteral value) throws ExpressionParsingException;
+
+  public String getOptionName() {
+    return optionName;
+  }
+
+  public String getDefaultString(){
+    return null;
+  }
+
+
+  public abstract OptionValue getDefault();
+
+  public abstract void validate(OptionValue v) throws ExpressionParsingException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
new file mode 100644
index 0000000..7b4f7f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import org.apache.drill.exec.cache.JacksonSerializable;
+
+import com.google.common.base.Preconditions;
+
+
+public class OptionValue extends JacksonSerializable {
+
+  public static enum OptionType {
+    BOOT, SYSTEM, SESSION
+  }
+
+  public static enum Kind {
+    BOOLEAN, LONG, STRING, DOUBLE
+  }
+
+  public String name;
+  public Kind kind;
+  public OptionType type;
+  public Long num_val;
+  public String string_val;
+  public Boolean bool_val;
+  public Double float_val;
+
+  public static OptionValue createLong(OptionType type, String name, long val) {
+    return new OptionValue(Kind.LONG, type, name, val, null, null, null);
+  }
+
+  public static OptionValue createBoolean(OptionType type, String name, boolean bool) {
+    return new OptionValue(Kind.BOOLEAN, type, name, null, null, bool, null);
+  }
+
+  public static OptionValue createString(OptionType type, String name, String val) {
+    return new OptionValue(Kind.STRING, type, name, null, val, null, null);
+  }
+
+  public static OptionValue createDouble(OptionType type, String name, double val) {
+    return new OptionValue(Kind.DOUBLE, type, name, null, null, null, val);
+  }
+
+  public OptionValue(){}
+
+  private OptionValue(Kind kind, OptionType type, String name, Long num_val, String string_val, Boolean bool_val, Double float_val) {
+    super();
+    Preconditions.checkArgument(num_val != null || string_val != null || bool_val != null);
+    this.name = name;
+    this.kind = kind;
+    this.float_val = float_val;
+    this.type = type;
+    this.num_val = num_val;
+    this.string_val = string_val;
+    this.bool_val = bool_val;
+    this.type = type;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((bool_val == null) ? 0 : bool_val.hashCode());
+    result = prime * result + ((float_val == null) ? 0 : float_val.hashCode());
+    result = prime * result + ((kind == null) ? 0 : kind.hashCode());
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((num_val == null) ? 0 : num_val.hashCode());
+    result = prime * result + ((string_val == null) ? 0 : string_val.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    OptionValue other = (OptionValue) obj;
+    if (bool_val == null) {
+      if (other.bool_val != null)
+        return false;
+    } else if (!bool_val.equals(other.bool_val))
+      return false;
+    if (float_val == null) {
+      if (other.float_val != null)
+        return false;
+    } else if (!float_val.equals(other.float_val))
+      return false;
+    if (kind != other.kind)
+      return false;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (num_val == null) {
+      if (other.num_val != null)
+        return false;
+    } else if (!num_val.equals(other.num_val))
+      return false;
+    if (string_val == null) {
+      if (other.string_val != null)
+        return false;
+    } else if (!string_val.equals(other.string_val))
+      return false;
+    if (type != other.type)
+      return false;
+    return true;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
new file mode 100644
index 0000000..993cead
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class SessionOptionManager implements OptionManager{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
+
+  private Map<String, OptionValue> options = Maps.newConcurrentMap();
+  private OptionManager systemOptions;
+
+  public SessionOptionManager(OptionManager systemOptions) {
+    super();
+    this.systemOptions = systemOptions;
+  }
+
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return Iterables.concat(systemOptions, options.values()).iterator();
+  }
+
+  @Override
+  public OptionValue getOption(String name) {
+    OptionValue opt = options.get(name);
+    if(opt == null){
+      return systemOptions.getOption(name);
+    }else{
+      return opt;
+    }
+  }
+
+  @Override
+  public void setOption(OptionValue value) {
+    systemOptions.getAdmin().validate(value);
+    setValidatedOption(value);
+  }
+
+
+  @Override
+  public OptionList getSessionOptionList() {
+    OptionList list = new OptionList();
+    for(OptionValue o : options.values()){
+      list.add(o);
+    }
+    return list;
+  }
+
+  private void setValidatedOption(OptionValue value){
+    if(value.type == OptionType.SYSTEM){
+      systemOptions.setOption(value);
+    }else{
+      options.put(value.name, value);
+    }
+  }
+
+  @Override
+  public void setOption(String name, SqlLiteral literal) {
+    OptionValue val = systemOptions.getAdmin().validate(name,  literal);
+    val.type = OptionValue.OptionType.SESSION;
+    setValidatedOption(val);
+  }
+
+  @Override
+  public OptionAdmin getAdmin() {
+    return systemOptions.getAdmin();
+  }
+
+  @Override
+  public OptionManager getSystemManager() {
+    return systemOptions;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
new file mode 100644
index 0000000..dd698c3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SetOptionException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Set;
+
+import javax.validation.ConstraintViolation;
+
+import org.apache.drill.common.exceptions.LogicalPlanParsingException;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalOperatorBase;
+
+public class SetOptionException extends LogicalPlanParsingException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionException.class);
+
+  public SetOptionException() {
+    super();
+
+  }
+
+  public SetOptionException(LogicalOperator operator, Set<ConstraintViolation<LogicalOperatorBase>> violations) {
+    super(operator, violations);
+
+  }
+
+  public SetOptionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+
+  }
+
+  public SetOptionException(String message, Throwable cause) {
+    super(message, cause);
+
+  }
+
+  public SetOptionException(String message) {
+    super(message);
+
+  }
+
+  public SetOptionException(Throwable cause) {
+    super(cause);
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
new file mode 100644
index 0000000..98975e4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Maps;
+
+public class SystemOptionManager implements OptionManager{
+
+  private final OptionValidator[] VALIDATORS = {
+      PlannerSettings.EXCHANGE
+  };
+
+  private DistributedMap<OptionValue> options;
+  private SystemOptionAdmin admin;
+  private final ConcurrentMap<String, OptionValidator> knownOptions = Maps.newConcurrentMap();
+  private DistributedCache cache;
+
+  public SystemOptionManager(DistributedCache cache){
+    this.cache = cache;
+  }
+
+  public void init(){
+    this.options = cache.getNamedMap("system.options", OptionValue.class);
+    this.admin = new SystemOptionAdmin();
+  }
+
+  private class Iter implements Iterator<OptionValue>{
+    private Iterator<Map.Entry<String, OptionValue>> inner;
+
+    public Iter(Iterator<Map.Entry<String, OptionValue>> inner){
+      this.inner = inner;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return inner.hasNext();
+    }
+
+    @Override
+    public OptionValue next() {
+      return inner.next().getValue();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return new Iter(options.iterator());
+  }
+
+  @Override
+  public OptionValue getOption(String name) {
+    return options.get(name);
+  }
+
+  @Override
+  public void setOption(OptionValue value) {
+    admin.validate(value);
+    assert value.type == OptionType.SYSTEM;
+    options.put(value.name, value);
+  }
+
+  @Override
+  public void setOption(String name, SqlLiteral literal) {
+    OptionValue v = admin.validate(name, literal);
+    v.type = OptionValue.OptionType.SYSTEM;
+    options.put(name, v);
+  }
+
+  @Override
+  public OptionList getSessionOptionList() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OptionManager getSystemManager() {
+    return this;
+  }
+
+  @Override
+  public OptionAdmin getAdmin() {
+    return admin;
+  }
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class);
+
+
+  private class SystemOptionAdmin implements OptionAdmin{
+
+    public SystemOptionAdmin(){
+      for(OptionValidator v : VALIDATORS){
+        knownOptions.put(v.getOptionName(), v);
+        options.putIfAbsent(v.getOptionName(), v.getDefault());
+      }
+    }
+
+
+    @Override
+    public void registerOptionType(OptionValidator validator) {
+      if(null != knownOptions.putIfAbsent(validator.getOptionName(), validator) ){
+        throw new IllegalArgumentException("Only one option is allowed to be registered with name: " + validator.getOptionName());
+      }
+    }
+
+    @Override
+    public void validate(OptionValue v) throws SetOptionException {
+      OptionValidator validator = knownOptions.get(v.name);
+      if(validator == null) throw new SetOptionException("Unknown option " + v.name);
+      validator.validate(v);
+    }
+
+    @Override
+    public OptionValue validate(String name, SqlLiteral value) throws SetOptionException {
+      OptionValidator validator = knownOptions.get(name);
+      if(validator == null) throw new SetOptionException("Unknown option " + name);
+      return validator.validate(value);
+    }
+
+
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
new file mode 100644
index 0000000..0d681ea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+public class TypeValidators {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeValidators.class);
+
+  public static class PositiveLongValidator extends LongValidator {
+
+    private final long max;
+
+    public PositiveLongValidator(String name, long max, long def) {
+      super(name, def);
+      this.max = max;
+    }
+
+    @Override
+    public void extraValidate(OptionValue v) throws ExpressionParsingException {
+      if (v.num_val > max || v.num_val < 0)
+        throw new ExpressionParsingException(String.format("Option %s must be between %d and %d.", getOptionName(), 0,
+            max));
+    }
+  }
+
+  public static class BooleanValidator extends TypeValidator{
+    public BooleanValidator(String name, boolean def){
+      super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def));
+    }
+  }
+  public static class StringValidator extends TypeValidator{
+    public StringValidator(String name, String def){
+      super(name, Kind.LONG, OptionValue.createString(OptionType.SYSTEM, name, def));
+    }
+
+  }
+  public static class LongValidator extends TypeValidator{
+    public LongValidator(String name, long def){
+      super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def));
+    }
+  }
+  public static class DoubleValidator extends TypeValidator{
+
+    public DoubleValidator(String name, double def){
+      super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def));
+    }
+
+
+  }
+
+  public static abstract class TypeValidator extends OptionValidator {
+    final Kind kind;
+    private OptionValue defaultValue;
+
+    public TypeValidator(String name, Kind kind, OptionValue defValue) {
+      super(name);
+      this.kind = kind;
+      this.defaultValue = defValue;
+    }
+
+    @Override
+    public OptionValue getDefault() {
+      return defaultValue;
+    }
+
+    @Override
+    public OptionValue validate(SqlLiteral value) throws ExpressionParsingException {
+      OptionValue op = getPartialValue(getOptionName(), (OptionType) null, value);
+      validate(op);
+      return op;
+    }
+
+    @Override
+    public final void validate(OptionValue v) throws ExpressionParsingException {
+      if (v.kind != kind)
+        throw new ExpressionParsingException(String.format("Option %s must be of type %s but you tried to set to %s.",
+            getOptionName(), kind.name(), v.kind.name()));
+    }
+
+    public void extraValidate(OptionValue v) throws ExpressionParsingException {
+    }
+
+  }
+
+  public static OptionValue getPartialValue(String name, OptionType type, SqlLiteral literal) {
+    switch (literal.getTypeName()) {
+    case DECIMAL:
+    case DOUBLE:
+    case FLOAT:
+      return OptionValue.createDouble(type, name, ((BigDecimal) literal.getValue()).doubleValue());
+
+    case SMALLINT:
+    case TINYINT:
+    case BIGINT:
+    case INTEGER:
+      return OptionValue.createLong(type, name, ((BigDecimal) literal.getValue()).longValue());
+
+    case VARBINARY:
+    case VARCHAR:
+    case CHAR:
+      return OptionValue.createString(type, name, (String) literal.getValue());
+
+    case BOOLEAN:
+      return OptionValue.createBoolean(type, name, (Boolean) literal.getValue());
+
+    }
+
+    throw new ExpressionParsingException(String.format(
+        "Drill doesn't support set option expressions with literals of type %s.", literal.getTypeName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 4d88686..948c74f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import com.google.common.base.Preconditions;
 import net.hydromatic.linq4j.expressions.DefaultExpression;
 import net.hydromatic.linq4j.expressions.Expression;
 import net.hydromatic.optiq.SchemaPlus;
@@ -40,10 +39,9 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.util.PathScanner;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.rpc.user.DrillUser;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -51,9 +49,12 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
 import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePlugin;
+import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
 import org.eigenbase.relopt.RelOptRule;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
@@ -91,9 +92,9 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
       int i =0;
       for(Constructor<?> c : plugin.getConstructors()){
         Class<?>[] params = c.getParameterTypes();
-        if(params.length != 3 
-            || params[1] != DrillbitContext.class 
-            || !StoragePluginConfig.class.isAssignableFrom(params[0]) 
+        if(params.length != 3
+            || params[1] != DrillbitContext.class
+            || !StoragePluginConfig.class.isAssignableFrom(params[0])
             || params[2] != String.class){
           logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
           continue;
@@ -109,7 +110,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
     // create registered plugins defined in "storage-plugins.json"
     this.plugins = ImmutableMap.copyOf(createPlugins());
 
-    // query registered engines for optimizer rules and build the storage plugin RuleSet 
+    // query registered engines for optimizer rules and build the storage plugin RuleSet
     Builder<RelOptRule> setBuilder = ImmutableSet.builder();
     for (StoragePlugin plugin : this.plugins.values()) {
       Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
@@ -135,17 +136,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
         String pluginsData = Resources.toString(url, Charsets.UTF_8);
         plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
       }
-      DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
-      StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
-      if (cachedPluginsSerializable != null) {
-        cachedPlugins = cachedPluginsSerializable.getObj();
+      DistributedMap<StoragePlugins> map = context.getCache().getMap(StoragePlugins.class);
+      cachedPlugins = map.get("storage-plugins");
+      if (cachedPlugins != null) {
         logger.debug("Found cached storage plugin config: {}", cachedPlugins);
       } else {
         Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
         logger.debug("caching storage plugin config {}", plugins);
-        map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
-        cachedPluginsSerializable = map.get("storage-plugins");
-        cachedPlugins = cachedPluginsSerializable.getObj();
+        map.put("storage-plugins", plugins);
+        cachedPlugins = map.get("storage-plugins");
       }
       if(!(plugins == null || cachedPlugins.equals(plugins))) {
         logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
@@ -155,7 +154,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
     }catch(IOException e){
       throw new IllegalStateException("Failure while reading storage plugins data.", e);
     }
-    
+
     for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
       try{
         StoragePlugin plugin = create(config.getKey(), config.getValue());
@@ -165,14 +164,15 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
       }
     }
     activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
-    
+    activePlugins.put("sys", new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, "sys"));
+
     return activePlugins;
   }
 
   public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
     return plugins.get(registeredStoragePluginName);
   }
-  
+
   public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
     if(config instanceof NamedStoragePluginConfig){
       return plugins.get(((NamedStoragePluginConfig) config).name);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
new file mode 100644
index 0000000..2c75651
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java.orig
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import net.hydromatic.linq4j.expressions.DefaultExpression;
+import net.hydromatic.linq4j.expressions.Expression;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.RuleSet;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
+import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
+import org.eigenbase.relopt.RelOptRule;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.io.Resources;
+import org.apache.drill.exec.store.options.OptionValueStorageConfig;
+import org.apache.drill.exec.store.options.OptionValueStoragePlugin;
+
+
+public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
+
+  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+  private ImmutableMap<String, StoragePlugin> plugins;
+
+  private DrillbitContext context;
+  private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
+
+  private RuleSet storagePluginsRuleSet;
+
+  private static final Expression EXPRESSION = new DefaultExpression(Object.class);
+
+  public StoragePluginRegistry(DrillbitContext context) {
+    try{
+    this.context = context;
+    }catch(RuntimeException e){
+      logger.error("Failure while loading storage plugin registry.", e);
+      throw new RuntimeException("Faiure while reading and loading storage plugin configuration.", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void init() throws DrillbitStartupException {
+    DrillConfig config = context.getConfig();
+    Collection<Class<? extends StoragePlugin>> plugins = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    logger.debug("Loading storage plugins {}", plugins);
+    for(Class<? extends StoragePlugin> plugin: plugins){
+      int i =0;
+      for(Constructor<?> c : plugin.getConstructors()){
+        Class<?>[] params = c.getParameterTypes();
+        if(params.length != 3 
+            || params[1] != DrillbitContext.class 
+            || !StoragePluginConfig.class.isAssignableFrom(params[0]) 
+            || params[2] != String.class){
+          logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
+          continue;
+        }
+        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
+        i++;
+      }
+      if(i == 0){
+        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters of (StorangePluginConfig, Config)", plugin.getCanonicalName());
+      }
+    }
+
+    // create registered plugins defined in "storage-plugins.json"
+    this.plugins = ImmutableMap.copyOf(createPlugins());
+
+    // query registered engines for optimizer rules and build the storage plugin RuleSet 
+    Builder<RelOptRule> setBuilder = ImmutableSet.builder();
+    for (StoragePlugin plugin : this.plugins.values()) {
+      Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
+      if (rules != null && rules.size() > 0) {
+        setBuilder.addAll(rules);
+      }
+    }
+    this.storagePluginsRuleSet = DrillRuleSets.create(setBuilder.build());
+  }
+
+  private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+    /*
+     * Check if "storage-plugins.json" exists. Also check if "storage-plugins" object exists in Distributed Cache.
+     * If both exist, check that they are the same. If they differ, throw exception. If "storage-plugins.json" exists, but
+     * nothing found in cache, then add it to the cache. If neither are found, throw exception.
+     */
+    StoragePlugins plugins = null;
+    StoragePlugins cachedPlugins = null;
+    Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
+    try{
+      URL url = Resources.class.getClassLoader().getResource("storage-plugins.json");
+      if (url != null) {
+        String pluginsData = Resources.toString(url, Charsets.UTF_8);
+        plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
+      }
+      DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
+      StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
+      if (cachedPluginsSerializable != null) {
+        cachedPlugins = cachedPluginsSerializable.getObj();
+        logger.debug("Found cached storage plugin config: {}", cachedPlugins);
+      } else {
+        Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
+        logger.debug("caching storage plugin config {}", plugins);
+        map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
+        cachedPluginsSerializable = map.get("storage-plugins");
+        cachedPlugins = cachedPluginsSerializable.getObj();
+      }
+      if(!(plugins == null || cachedPlugins.equals(plugins))) {
+        logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
+        throw new DrillbitStartupException("Storage plugin config mismatch");
+      }
+      logger.debug("using plugin config: {}", cachedPlugins);
+    }catch(IOException e){
+      throw new IllegalStateException("Failure while reading storage plugins data.", e);
+    }
+    
+    for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
+      try{
+        StoragePlugin plugin = create(config.getKey(), config.getValue());
+        activePlugins.put(config.getKey(), plugin);
+      }catch(ExecutionSetupException e){
+        logger.error("Failure while setting up StoragePlugin with name: '{}'.", config.getKey(), e);
+      }
+    }
+<<<<<<< HEAD
+    activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+    
+    return activePlugins;
+=======
+    activeEngines.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+    activeEngines.put("OPTIONS", new OptionValueStoragePlugin(new OptionValueStorageConfig(), context, "OPTIONS"));
+
+    return activeEngines;
+>>>>>>> Drill 381 - implementation of session and global options.
+  }
+
+  public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
+    return plugins.get(registeredStoragePluginName);
+  }
+  
+  public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
+    if(config instanceof NamedStoragePluginConfig){
+      return plugins.get(((NamedStoragePluginConfig) config).name);
+    }else{
+      // TODO: for now, we'll throw away transient configs.  we really ought to clean these up.
+      return create(null, config);
+    }
+  }
+
+  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException{
+    StoragePlugin p = getPlugin(storageConfig);
+    if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin.  The actual type of plugin was %s.", p.getClass().getName()));
+    FileSystemPlugin storage = (FileSystemPlugin) p;
+    return storage.getFormatPlugin(formatConfig);
+  }
+
+  private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+    StoragePlugin plugin = null;
+    Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
+    if (c == null)
+      throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+          pluginConfig));
+    try {
+      plugin = c.newInstance(pluginConfig, context, name);
+      return plugin;
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
+      if (t instanceof ExecutionSetupException)
+        throw ((ExecutionSetupException) t);
+      throw new ExecutionSetupException(String.format(
+          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
+    }
+  }
+
+  @Override
+  public Iterator<Entry<String, StoragePlugin>> iterator() {
+    return plugins.entrySet().iterator();
+  }
+
+  public RuleSet getStoragePluginRuleSet() {
+    return storagePluginsRuleSet;
+  }
+
+  public DrillSchemaFactory getSchemaFactory(){
+    return schemaFactory;
+  }
+
+  public class DrillSchemaFactory implements SchemaFactory{
+
+    @Override
+    public void registerSchemas(DrillUser user, SchemaPlus parent) {
+      for(Map.Entry<String, StoragePlugin> e : plugins.entrySet()){
+        e.getValue().registerSchemas(user, parent);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index 256b6b6..eed4f03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
@@ -83,4 +84,10 @@ public class DirectGroupScan extends AbstractGroupScan{
   public String getDigest() {
     return String.valueOf(reader);
   }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index 8ae5116..84b6690 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
@@ -74,13 +76,13 @@ public class MockGroupScanPOP extends AbstractGroupScan {
   public List<MockScanEntry> getReadEntries() {
     return readEntries;
   }
-  
+
   public static class MockScanEntry{
 
     private final int records;
     private final MockColumn[] types;
     private final int recordSize;
-    
+
 
     @JsonCreator
     public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
@@ -97,7 +99,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public OperatorCost getCost() {
       return new OperatorCost(1, 2, 1, 1);
     }
-    
+
     public int getRecords() {
       return records;
     }
@@ -116,7 +118,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
       return "MockScanEntry [records=" + records + ", columns=" + Arrays.toString(types) + "]";
     }
   }
-  
+
   @JsonInclude(Include.NON_NULL)
   public static class MockColumn{
     @JsonProperty("type") public MinorType minorType;
@@ -125,8 +127,8 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public Integer width;
     public Integer precision;
     public Integer scale;
-    
-    
+
+
     @JsonCreator
     public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
       this.name = name;
@@ -136,7 +138,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
       this.precision = precision;
       this.scale = scale;
     }
-    
+
     @JsonProperty("type")
     public MinorType getMinorType() {
       return minorType;
@@ -156,7 +158,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public Integer getScale() {
       return scale;
     }
-    
+
     @JsonIgnore
     public MajorType getMajorType(){
       MajorType.Builder b = MajorType.newBuilder();
@@ -172,7 +174,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     public String toString() {
       return "MockColumn [minorType=" + minorType + ", name=" + name + ", mode=" + mode + "]";
     }
-    
+
   }
 
   @Override
@@ -184,7 +186,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
   @Override
   public void applyAssignments(List<DrillbitEndpoint> endpoints) {
     Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-    
+
     mappings = new LinkedList[endpoints.size()];
 
     int i =0;
@@ -230,6 +232,11 @@ public class MockGroupScanPOP extends AbstractGroupScan {
   }
 
   @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
+  @Override
   public String getDigest() {
     return toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
index f0d4fb6..0c9a1cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
@@ -48,7 +48,7 @@ abstract class AbstractWriter<V extends ValueVector> implements PojoWriter{
 
   @Override
   public void allocate() {
-    AllocationHelper.allocate(vector, 500, 100);
+    vector.allocateNew();
   }
 
   public void setValueCount(int valueCount){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
new file mode 100644
index 0000000..8ecb29f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoDataType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pojo;
+
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import com.google.common.collect.Lists;
+
+public class PojoDataType {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoDataType.class);
+
+  public List<SqlTypeName> types = Lists.newArrayList();
+  public List<String> names = Lists.newArrayList();
+
+  public PojoDataType(Class<?> pojoClass){
+    logger.debug(pojoClass.getName());
+    Field[] fields = pojoClass.getDeclaredFields();
+    for(int i = 0; i < fields.length; i++){
+      Field f = fields[i];
+
+      Class<?> type = f.getType();
+      names.add(f.getName());
+
+      if(type == int.class || type == Integer.class){
+        types.add(SqlTypeName.INTEGER);
+      }else if(type == boolean.class || type == Boolean.class){
+        types.add(SqlTypeName.BOOLEAN);
+      }else if(type == long.class || type == Long.class){
+        types.add(SqlTypeName.BIGINT);
+      }else if(type == double.class || type == Double.class){
+        types.add(SqlTypeName.DOUBLE);
+      }else if(type == String.class){
+        types.add(SqlTypeName.VARCHAR);
+      }else if(type.isEnum()){
+        types.add(SqlTypeName.VARCHAR);
+      }else{
+        throw new RuntimeException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type));
+      }
+    }
+  }
+
+
+  public RelDataType getRowType(RelDataTypeFactory f){
+    List<RelDataType> fields = Lists.newArrayList();
+    for(SqlTypeName n : types){
+      fields.add(f.createSqlType(n));
+    }
+    return f.createStructType(fields, names);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 8dac455..4203abc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -25,8 +25,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.pojo.Writers.BitWriter;
+import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.EnumWriter;
 import org.apache.drill.exec.store.pojo.Writers.IntWriter;
 import org.apache.drill.exec.store.pojo.Writers.LongWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBigIntWriter;
+import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter;
+import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
+import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
 import org.apache.drill.exec.store.pojo.Writers.StringWriter;
 
 public class PojoRecordReader<T> implements RecordReader{
@@ -48,7 +54,7 @@ public class PojoRecordReader<T> implements RecordReader{
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     try{
-      Field[] fields = pojoClass.getFields();
+      Field[] fields = pojoClass.getDeclaredFields();
       writers = new PojoWriter[fields.length];
       for(int i = 0; i < writers.length; i++){
         Field f = fields[i];
@@ -56,6 +62,18 @@ public class PojoRecordReader<T> implements RecordReader{
 
         if(type == int.class){
           writers[i] = new IntWriter(f);
+        }else if(type == Integer.class){
+          writers[i] = new NIntWriter(f);
+        }else if(type == Long.class){
+          writers[i] = new NBigIntWriter(f);
+        }else if(type == Boolean.class){
+          writers[i] = new NBooleanWriter(f);
+        }else if(type == double.class){
+          writers[i] = new DoubleWriter(f);
+        }else if(type == Double.class){
+          writers[i] = new NDoubleWriter(f);
+        }else if(type.isEnum()){
+          writers[i] = new EnumWriter(f);
         }else if(type == boolean.class){
           writers[i] = new BitWriter(f);
         }else if(type == long.class){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
index 6910903..b986be8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
@@ -27,7 +27,12 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 
 import com.google.common.base.Charsets;
@@ -81,18 +86,32 @@ public class Writers {
 
   }
 
-  public static class StringWriter extends AbstractWriter<NullableVarCharVector>{
+  public static class DoubleWriter extends AbstractWriter<Float8Vector>{
 
+    public DoubleWriter(Field field) {
+      super(field, Types.required(MinorType.FLOAT8));
+      if(field.getType() != double.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      double d = field.getDouble(pojo);
+
+      return vector.getMutator().setSafe(outboundIndex, d);
+    }
+
+  }
+
+  private abstract static class AbstractStringWriter extends AbstractWriter<NullableVarCharVector>{
     private ByteBuf data;
     private final NullableVarCharHolder h = new NullableVarCharHolder();
 
-    public StringWriter(Field field) {
+    public AbstractStringWriter(Field field) {
       super(field, Types.optional(MinorType.VARCHAR));
-      if(field.getType() != String.class) throw new IllegalStateException();
       ensureLength(100);
     }
 
-    private void ensureLength(int len){
+    void ensureLength(int len){
       if(data == null || data.capacity() < len){
         if(data != null) data.release();
         data = UnpooledByteBufAllocator.DEFAULT.buffer(len);
@@ -103,11 +122,9 @@ public class Writers {
       data.release();
     }
 
-    @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
-      String s = (String) field.get(pojo);
+    public boolean writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       if(s == null){
-        h.isSet = 0;
+        return true;
       }else{
         h.isSet = 1;
         byte[] bytes = s.getBytes(Charsets.UTF_8);
@@ -117,10 +134,108 @@ public class Writers {
         h.buffer = data;
         h.start = 0;
         h.end = bytes.length;
+        return vector.getMutator().setSafe(outboundIndex, h);
+
+      }
+
+    }
+
+  }
+
+  public static class EnumWriter extends AbstractStringWriter{
+    public EnumWriter(Field field) {
+      super(field);
+      if(!field.getType().isEnum()) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Enum<?> e= ((Enum<?>) field.get(pojo));
+      if(e == null) return true;
+      return writeString(e.name(), outboundIndex);
+    }
+  }
+
+  public static class StringWriter extends AbstractStringWriter {
+    public StringWriter(Field field) {
+      super(field);
+      if(field.getType() != String.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      String s = (String) field.get(pojo);
+      return writeString(s, outboundIndex);
+    }
+  }
+
+  public static class NIntWriter extends AbstractWriter<NullableIntVector>{
+
+    public NIntWriter(Field field) {
+      super(field, Types.optional(MinorType.INT));
+      if(field.getType() != Integer.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Integer i = (Integer) field.get(pojo);
+      if(i != null){
+        return vector.getMutator().setSafe(outboundIndex, i);
+      }
+      return true;
+    }
+
+  }
 
+  public static class NBigIntWriter extends AbstractWriter<NullableBigIntVector>{
+
+    public NBigIntWriter(Field field) {
+      super(field, Types.optional(MinorType.BIGINT));
+      if(field.getType() != Long.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Long o = (Long) field.get(pojo);
+      if(o != null){
+        return vector.getMutator().setSafe(outboundIndex, o);
       }
+      return true;
+    }
 
-      return vector.getMutator().setSafe(outboundIndex, h);
+  }
+
+  public static class NBooleanWriter extends AbstractWriter<NullableBitVector>{
+
+    public NBooleanWriter(Field field) {
+      super(field, Types.optional(MinorType.BIT));
+      if(field.getType() != Boolean.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Boolean o = (Boolean) field.get(pojo);
+      if(o != null){
+        return vector.getMutator().setSafe(outboundIndex, o ? 1 : 0);
+      }
+      return true;
+    }
+
+  }
+  public static class NDoubleWriter extends AbstractWriter<NullableFloat8Vector>{
+
+    public NDoubleWriter(Field field) {
+      super(field, Types.optional(MinorType.FLOAT8));
+      if(field.getType() != Double.class) throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+      Double o = (Double) field.get(pojo);
+      if(o != null){
+        return vector.getMutator().setSafe(outboundIndex, o);
+      }
+      return true;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
new file mode 100644
index 0000000..844fd68
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class DrillbitIterator implements Iterator<Object> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitIterator.class);
+
+  private Iterator<DrillbitEndpoint> endpoints;
+
+  public DrillbitIterator(FragmentContext c) {
+    this.endpoints = c.getDrillbitContext().getBits().iterator();
+  }
+
+  public static class DrillbitInstance {
+    public String host;
+    public int user_port;
+    public int control_port;
+    public int data_port;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return endpoints.hasNext();
+  }
+
+  @Override
+  public Object next() {
+    DrillbitEndpoint ep = endpoints.next();
+    DrillbitInstance i = new DrillbitInstance();
+    i.host = ep.getAddress();
+    i.user_port = ep.getUserPort();
+    i.control_port = ep.getControlPort();
+    i.data_port = ep.getDataPort();
+    return i;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
new file mode 100644
index 0000000..c1e8dd1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public class StaticDrillTable extends DrillTable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StaticDrillTable.class);
+
+  private final PojoDataType type;
+
+  public StaticDrillTable(PojoDataType type, String storageEngineName, StoragePlugin plugin, Object selection) {
+    super(storageEngineName, plugin, selection);
+    this.type = type;
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return type.getRowType(typeFactory);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
new file mode 100644
index 0000000..4301f12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.pojo.PojoDataType;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+
+public enum SystemTable {
+  OPTION("options", OptionValue.class),
+  DRILLBITS("drillbits", DrillbitIterator.DrillbitInstance.class)
+  ;
+
+  private final PojoDataType type;
+  private final String tableName;
+  private final Class<?> pojoClass;
+
+  SystemTable(String tableName, Class<?> clazz){
+    this.type = new PojoDataType(clazz);
+    this.tableName = tableName;
+    this.pojoClass = clazz;
+  }
+
+  public String getTableName(){
+    return tableName;
+  }
+
+  public RelDataType getRowType(RelDataTypeFactory f){
+    return type.getRowType(f);
+  }
+
+  public PojoDataType getType(){
+    return type;
+  }
+
+  public Class<?> getPojoClass(){
+    return pojoClass;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/08923cb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
new file mode 100644
index 0000000..a1bec1e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
+
+public class SystemTableBatchCreator implements BatchCreator<SystemTableScan>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class);
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Override
+  public RecordBatch getBatch(FragmentContext context, SystemTableScan scan, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Iterator<Object> iter = scan.getPlugin().getRecordIterator(context, scan.getTable());
+    PojoRecordReader reader = new PojoRecordReader(scan.getTable().getPojoClass(), iter);
+
+    return new ScanBatch(scan, context, Collections.singleton( (RecordReader) reader).iterator());
+  }
+}