You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/11/20 17:44:38 UTC

[02/12] incubator-drill git commit: DRILL-1684, DRILL-1517, DRILL-1350: Profile and cancellation updates - Remove any storage of persisted profiles. - Store a separate query info object for active queries. - Update cancellation and running profile loadin

DRILL-1684, DRILL-1517, DRILL-1350: Profile and cancellation updates
- Remove any storage of persisted profiles.
- Store a separate query info object for active queries.
- Update cancellation and running profile loading to query foreman server.
- Make file store support HDFS APIs
- Update PStoreProvider to use configuration to decide if you want PERSISTENT, EPHEMERAL, or BLOB storage rather than separate interfaces.
- Update ZkPStore's persistent mode to leverage a cache and respond to changes rather than actively probing values.
- Update ZkPStore's cache to be effectively write-through.
- Automatically delete deprecated or default value options from PStore.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2eb72a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2eb72a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2eb72a7c

Branch: refs/heads/master
Commit: 2eb72a7c238fe960b61edbae0d3dea1c836c746f
Parents: 90c12c8
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Nov 9 15:16:52 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Nov 19 21:35:39 2014 -0800

----------------------------------------------------------------------
 .../exec/store/hbase/config/HBasePStore.java    |   17 +-
 .../store/hbase/config/HBasePStoreProvider.java |   45 +-
 .../drill/hbase/TestHBaseTableProvider.java     |    4 +-
 .../exec/store/mongo/config/MongoPStore.java    |   10 -
 .../store/mongo/config/MongoPStoreProvider.java |   16 +-
 .../exec/rpc/control/ControlRpcConfig.java      |    1 +
 .../drill/exec/rpc/control/ControlTunnel.java   |   20 +
 .../drill/exec/rpc/data/DataRpcConfig.java      |    1 +
 .../server/options/FallbackOptionManager.java   |   97 ++
 .../server/options/FragmentOptionManager.java   |   50 +-
 .../server/options/InMemoryOptionManager.java   |   50 +
 .../exec/server/options/QueryOptionManager.java |   72 +-
 .../server/options/SessionOptionManager.java    |   71 +-
 .../server/options/SystemOptionManager.java     |   79 +-
 .../drill/exec/store/StoragePluginRegistry.java |    2 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |    3 +-
 .../org/apache/drill/exec/store/sys/EStore.java |    6 +-
 .../drill/exec/store/sys/EStoreProvider.java    |    2 +-
 .../org/apache/drill/exec/store/sys/PStore.java |    6 +-
 .../drill/exec/store/sys/PStoreConfig.java      |   47 +-
 .../drill/exec/store/sys/PStoreProvider.java    |    4 +-
 .../drill/exec/store/sys/local/FilePStore.java  |  231 ++++
 .../store/sys/local/LocalEStoreProvider.java    |   13 +-
 .../drill/exec/store/sys/local/LocalPStore.java |  208 ----
 .../store/sys/local/LocalPStoreProvider.java    |   47 +-
 .../drill/exec/store/sys/local/MapEStore.java   |    6 +
 .../store/sys/local/NoWriteLocalPStore.java     |   10 -
 .../exec/store/sys/zk/ZkAbstractStore.java      |   89 +-
 .../exec/store/sys/zk/ZkEStoreProvider.java     |    6 +-
 .../drill/exec/store/sys/zk/ZkPStore.java       |   56 +-
 .../exec/store/sys/zk/ZkPStoreProvider.java     |   61 +-
 .../exec/work/batch/ControlHandlerImpl.java     |   18 +-
 .../apache/drill/exec/work/foreman/Foreman.java |    2 +-
 .../drill/exec/work/foreman/QueryStatus.java    |   73 +-
 .../apache/drill/exec/work/user/UserWorker.java |   10 +-
 .../drill/exec/store/sys/PStoreTestUtil.java    |    6 +-
 .../exec/store/sys/TestPStoreProviders.java     |    2 +-
 .../org/apache/drill/exec/proto/BitControl.java |   29 +-
 .../drill/exec/proto/SchemaUserBitShared.java   |  141 +++
 .../apache/drill/exec/proto/UserBitShared.java  | 1125 +++++++++++++++++-
 .../drill/exec/proto/beans/QueryInfo.java       |  253 ++++
 protocol/src/main/protobuf/BitControl.proto     |    1 +
 protocol/src/main/protobuf/UserBitShared.proto  |    7 +
 43 files changed, 2257 insertions(+), 740 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
index b5a697c..59a3125 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.hbase.config;
 
 import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY;
-import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY_BLOB;
 import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.QUALIFIER;
 
 import java.io.IOException;
@@ -63,16 +62,15 @@ public class HBasePStore<V> implements PStore<V> {
     return get(key, FAMILY);
   }
 
-  @Override
-  public V getBlob(String key) {
-    return get(key, FAMILY_BLOB);
-  }
-
   protected synchronized V get(String key, byte[] family) {
     try {
       Get get = new Get(row(key));
       get.addColumn(family, QUALIFIER);
-      return value(table.get(get));
+      Result r = table.get(get);
+      if(r.isEmpty()){
+        return null;
+      }
+      return value(r);
     } catch (IOException e) {
       throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
     }
@@ -83,11 +81,6 @@ public class HBasePStore<V> implements PStore<V> {
     put(key, FAMILY, value);
   }
 
-  @Override
-  public void putBlob(String key, V value) {
-    put(key, FAMILY_BLOB, value);
-  }
-
   protected synchronized void put(String key, byte[] family, V value) {
     try {
       Put put = new Put(row(key));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
index 52808d4..b3947b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
@@ -19,22 +19,16 @@ package org.apache.drill.exec.store.hbase.config;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.collect.Maps;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
 import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
-import org.apache.drill.exec.store.sys.local.MapEStore;
-import org.apache.drill.exec.store.sys.zk.ZkEStore;
 import org.apache.drill.exec.store.sys.zk.ZkEStoreProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -54,8 +48,6 @@ public class HBasePStoreProvider implements PStoreProvider {
 
   static final byte[] FAMILY = Bytes.toBytes("s");
 
-  static final byte[] FAMILY_BLOB = Bytes.toBytes("t");
-
   static final byte[] QUALIFIER = Bytes.toBytes("d");
 
   private final String storeTableName;
@@ -104,11 +96,28 @@ public class HBasePStoreProvider implements PStoreProvider {
     this.zkAvailable = false;
   }
 
+
+
   @Override
-  public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
-    return new HBasePStore<V>(store, this.table);
+  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+    switch(config.getMode()){
+    case EPHEMERAL:
+      if (this.zkAvailable) {
+        return zkEStoreProvider.getStore(config);
+      } else {
+        return localEStoreProvider.getStore(config);
+      }
+
+    case BLOB_PERSISTENT:
+    case PERSISTENT:
+      return new HBasePStore<V>(config, this.table);
+
+    default:
+      throw new IllegalStateException();
+    }
   }
 
+
   @Override
   public void start() throws IOException {
     this.connection = HConnectionManager.createConnection(hbaseConf);
@@ -117,14 +126,13 @@ public class HBasePStoreProvider implements PStoreProvider {
       if (!admin.tableExists(storeTableName)) {
         HTableDescriptor desc = new HTableDescriptor(storeTableName);
         desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1));
-        desc.addFamily(new HColumnDescriptor(FAMILY_BLOB).setMaxVersions(1));
         admin.createTable(desc);
       } else {
         HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName));
-        if (!desc.hasFamily(FAMILY) || !desc.hasFamily(FAMILY_BLOB)) {
+        if (!desc.hasFamily(FAMILY)) {
           throw new DrillRuntimeException("The HBase table " + storeTableName
               + " specified as persistent store exists but does not contain column family: "
-              + (desc.hasFamily(FAMILY) ? Bytes.toString(FAMILY_BLOB) : Bytes.toString(FAMILY)));
+              + (Bytes.toString(FAMILY)));
         }
       }
     }
@@ -134,17 +142,6 @@ public class HBasePStoreProvider implements PStoreProvider {
   }
 
   @Override
-  public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
-    // when ZK is available, use ZK as the Ephemeral store.
-    // when ZK is not available, use a Map as the Ephemeral store.
-    if (this.zkAvailable) {
-      return zkEStoreProvider.getEStore(store);
-    } else {
-      return localEStoreProvider.getEStore(store);
-    }
-  }
-
-  @Override
   public synchronized void close() {
     if (this.table != null) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
index b1ff44c..5f2d6c7 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
@@ -41,7 +41,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
 
   @Test
   public void testTableProvider() throws IOException {
-    PStore<String> hbaseStore = provider.getPStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase").build());
+    PStore<String> hbaseStore = provider.getStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase").build());
     hbaseStore.put("", "v0");
     hbaseStore.put("k1", "v1");
     hbaseStore.put("k2", "v2");
@@ -60,7 +60,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
     }
     assertEquals(7, rowCount);
 
-    PStore<String> hbaseTestStore = provider.getPStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase.test").build());
+    PStore<String> hbaseTestStore = provider.getStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase.test").build());
     hbaseTestStore.put("", "v0");
     hbaseTestStore.put("k1", "v1");
     hbaseTestStore.put("k2", "v2");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
index f256c98..fc5c05b 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
@@ -69,11 +69,6 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants {
   }
 
   @Override
-  public V getBlob(String key) {
-    throw new UnsupportedOperationException("Mongo DB PStore not currently supported");
-  }
-
-  @Override
   public void put(String key, V value) {
     try {
       DBObject putObj = new BasicDBObject(2);
@@ -87,11 +82,6 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants {
   }
 
   @Override
-  public void putBlob(String key, V value) {
-    throw new UnsupportedOperationException("Mongo DB PStore not currently supported");
-  }
-
-  @Override
   public boolean putIfAbsent(String key, V value) {
     try {
       DBObject check = new BasicDBObject(1).append(ID, key);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
index eb4ba53..7443c2e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
@@ -68,13 +68,17 @@ public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants
   }
 
   @Override
-  public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException {
-    return localEStoreProvider.getEStore(storeConfig);
-  }
+  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+    switch(config.getMode()){
+    case BLOB_PERSISTENT:
+    case PERSISTENT:
+      return new MongoPStore<>(config, collection);
+    case EPHEMERAL:
+      return localEStoreProvider.getStore(config);
+    default:
+      throw new IllegalStateException();
 
-  @Override
-  public <V> PStore<V> getPStore(PStoreConfig<V> config) throws IOException {
-    return new MongoPStore<>(config, collection);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 1308c37..37730e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -38,6 +38,7 @@ public class ControlRpcConfig {
       .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
       .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
       .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+      .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
       .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
       .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
       .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 461cd8a..a4f9fdf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -60,6 +60,12 @@ public class ControlTunnel {
     manager.runCommand(b);
   }
 
+  public DrillRpcFuture<Ack> requestCancelQuery(QueryId queryId){
+    CancelQuery c = new CancelQuery(queryId);
+    manager.runCommand(c);
+    return c.getFuture();
+  }
+
   public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, FinishedReceiver finishedReceiver){
     ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver);
     manager.runCommand(b);
@@ -151,4 +157,18 @@ public class ControlTunnel {
       connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class);
     }
   }
+
+  public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection> {
+    final QueryId queryId;
+
+    public CancelQuery(QueryId queryId) {
+      super();
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index 75fa17c..b54841d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -38,4 +38,5 @@ public class DataRpcConfig {
   public static int RPC_VERSION = 3;
 
   public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+  public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
new file mode 100644
index 0000000..1a5e137
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public abstract class FallbackOptionManager implements OptionManager{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FallbackOptionManager.class);
+
+  private Map<String, OptionValue> options = Maps.newConcurrentMap();
+  private OptionManager fallback;
+
+  public FallbackOptionManager(OptionManager fallback) {
+    super();
+    this.fallback = fallback;
+  }
+
+  @Override
+  public Iterator<OptionValue> iterator() {
+    return Iterables.concat(fallback, options.values()).iterator();
+  }
+
+  @Override
+  public OptionValue getOption(String name) {
+    OptionValue opt = getLocalOption(name);
+    if(opt == null && fallback != null){
+      return fallback.getOption(name);
+    }else{
+      return opt;
+    }
+  }
+
+  abstract OptionValue getLocalOption(String name);
+  abstract boolean setLocalOption(OptionValue value);
+
+  @Override
+  public void setOption(OptionValue value) {
+    fallback.getAdmin().validate(value);
+    setValidatedOption(value);
+  }
+
+  @Override
+  public void setOption(String name, SqlLiteral literal, OptionType type) {
+    OptionValue val = getAdmin().validate(name, literal);
+    val.type = type;
+    setValidatedOption(val);
+  }
+
+  private void setValidatedOption(OptionValue value) {
+    if (!setLocalOption(value)) {
+      fallback.setOption(value);
+    }
+  }
+
+
+  @Override
+  public OptionAdmin getAdmin() {
+    return fallback.getAdmin();
+  }
+
+  @Override
+  public OptionManager getSystemManager() {
+    return fallback.getSystemManager();
+  }
+
+  @Override
+  public OptionList getOptionList() {
+    OptionList list = new OptionList();
+    for (OptionValue o : options.values()) {
+      list.add(o);
+    }
+    return list;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
index 46d316b..e4dbbf8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
@@ -17,67 +17,31 @@
  */
 package org.apache.drill.exec.server.options;
 
-import java.util.Iterator;
 import java.util.Map;
 
-import org.eigenbase.sql.SqlLiteral;
-
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
-public class FragmentOptionManager implements OptionManager {
+public class FragmentOptionManager extends InMemoryOptionManager {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class);
 
-  ImmutableMap<String, OptionValue> options;
-  OptionManager systemOptions;
-
   public FragmentOptionManager(OptionManager systemOptions, OptionList options) {
+    super(systemOptions, getMapFromOptionList(options));
+  }
+
+  private static Map<String, OptionValue> getMapFromOptionList(OptionList options){
     Map<String, OptionValue> tmp = Maps.newHashMap();
     for(OptionValue v : options){
       tmp.put(v.name, v);
     }
-    this.options = ImmutableMap.copyOf(tmp);
-    this.systemOptions = systemOptions;
-  }
-
-  @Override
-  public Iterator<OptionValue> iterator() {
-    return Iterables.concat(systemOptions, options.values()).iterator();
+    return ImmutableMap.copyOf(tmp);
   }
 
   @Override
-  public OptionValue getOption(String name) {
-    OptionValue value = options.get(name);
-    if (value == null && systemOptions != null) {
-      value = systemOptions.getOption(name);
-    }
-    return value;
-  }
-
-  @Override
-  public void setOption(OptionValue value) throws SetOptionException {
+  boolean supportsOption(OptionValue value) {
     throw new UnsupportedOperationException();
   }
 
-  @Override
-  public void setOption(String name, SqlLiteral literal, OptionValue.OptionType type) throws SetOptionException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public OptionAdmin getAdmin() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public OptionManager getSystemManager() {
-    throw new UnsupportedOperationException();
-  }
 
-  @Override
-  public OptionList getOptionList() {
-    throw new UnsupportedOperationException();
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
new file mode 100644
index 0000000..59411ce
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Map;
+
+public abstract class InMemoryOptionManager extends FallbackOptionManager {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryOptionManager.class);
+
+  final Map<String, OptionValue> options;
+
+  InMemoryOptionManager(OptionManager fallback, Map<String, OptionValue> options) {
+    super(fallback);
+    this.options = options;
+  }
+
+  @Override
+  OptionValue getLocalOption(String name) {
+    return options.get(name);
+  }
+
+  @Override
+  boolean setLocalOption(OptionValue value) {
+    if(supportsOption(value)){
+      options.put(value.name, value);
+      return true;
+    }else{
+      return false;
+    }
+
+  }
+
+  abstract boolean supportsOption(OptionValue value);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
index 82fc5ba..d04b654 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
@@ -17,81 +17,21 @@
  */
 package org.apache.drill.exec.server.options;
 
-import java.util.Iterator;
-import java.util.Map;
+import java.util.HashMap;
 
-import org.eigenbase.sql.SqlLiteral;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class QueryOptionManager implements OptionManager {
+public class QueryOptionManager extends InMemoryOptionManager {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
 
-  private Map<String, OptionValue> options = Maps.newConcurrentMap();
-  private OptionManager sessionOptions;
-
   public QueryOptionManager(OptionManager sessionOptions) {
-    super();
-    this.sessionOptions = sessionOptions;
-  }
-
-  @Override
-  public Iterator<OptionValue> iterator() {
-    return Iterables.concat(sessionOptions, options.values()).iterator();
-  }
-
-  @Override
-  public OptionValue getOption(String name) {
-    OptionValue opt = options.get(name);
-    if (opt == null && sessionOptions != null) {
-      return sessionOptions.getOption(name);
-    } else {
-      return opt;
-    }
-  }
-
-  @Override
-  public void setOption(OptionValue value) {
-    sessionOptions.getAdmin().validate(value);
-    setValidatedOption(value);
+    super(sessionOptions, new HashMap<String, OptionValue>());
   }
 
   @Override
-  public void setOption(String name, SqlLiteral literal, OptionValue.OptionType type) {
-    OptionValue val = sessionOptions.getAdmin().validate(name, literal);
-    val.type = type;
-    setValidatedOption(val);
+  boolean supportsOption(OptionValue value) {
+    return value.type == OptionType.QUERY;
   }
 
-  private void setValidatedOption(OptionValue value) {
-    if (value.type == OptionValue.OptionType.QUERY) {
-      options.put(value.name, value);
-    } else {
-      sessionOptions.setOption(value);
-    }
-  }
-
-  @Override
-  public OptionManager.OptionAdmin getAdmin() {
-    return sessionOptions.getAdmin();
-  }
-
-  @Override
-  public OptionManager getSystemManager() {
-    return sessionOptions.getSystemManager();
-  }
-
-  @Override
-  public OptionList getOptionList() {
-    OptionList list = new OptionList();
-    list.addAll(sessionOptions.getOptionList());
-    list.addAll(options.values());
-    return list;
-  }
-
-  public OptionList getSessionOptionList() {
-    return sessionOptions.getOptionList();
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index 4268d02..45c0ce8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -17,79 +17,18 @@
  */
 package org.apache.drill.exec.server.options;
 
-import java.util.Iterator;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import org.eigenbase.sql.SqlLiteral;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class SessionOptionManager implements OptionManager{
+public class SessionOptionManager extends InMemoryOptionManager{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
 
-  private Map<String, OptionValue> options = Maps.newConcurrentMap();
-  private OptionManager systemOptions;
-
   public SessionOptionManager(OptionManager systemOptions) {
-    super();
-    this.systemOptions = systemOptions;
-  }
-
-  @Override
-  public Iterator<OptionValue> iterator() {
-    return Iterables.concat(systemOptions, options.values()).iterator();
-  }
-
-  @Override
-  public OptionValue getOption(String name) {
-    OptionValue opt = options.get(name);
-    if(opt == null && systemOptions != null){
-      return systemOptions.getOption(name);
-    }else{
-      return opt;
-    }
-  }
-
-  @Override
-  public void setOption(OptionValue value) {
-    systemOptions.getAdmin().validate(value);
-    setValidatedOption(value);
-  }
-
-  @Override
-  public void setOption(String name, SqlLiteral literal, OptionType type) {
-    OptionValue val = systemOptions.getAdmin().validate(name, literal);
-    val.type = type;
-    setValidatedOption(val);
-  }
-
-  private void setValidatedOption(OptionValue value) {
-    if (value.type == OptionType.SESSION) {
-      options.put(value.name, value);
-    } else {
-      systemOptions.setOption(value);
-    }
-  }
-
-  @Override
-  public OptionAdmin getAdmin() {
-    return systemOptions.getAdmin();
-  }
-
-  @Override
-  public OptionManager getSystemManager() {
-    return systemOptions;
+    super(systemOptions, new ConcurrentHashMap<String, OptionValue>());
   }
 
   @Override
-  public OptionList getOptionList() {
-    OptionList list = new OptionList();
-    for (OptionValue o : options.values()) {
-      list.add(o);
-    }
-    return list;
+  boolean supportsOption(OptionValue value) {
+    return value.type == OptionValue.OptionType.SESSION;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9f912e0..1622d7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.server.options;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.collections.IteratorUtils;
@@ -99,57 +100,61 @@ public class SystemOptionManager implements OptionManager {
   }
 
   public SystemOptionManager init() throws IOException{
-    this.options = provider.getPStore(config);
+    this.options = provider.getStore(config);
     this.admin = new SystemOptionAdmin();
     return this;
   }
 
-  private class Iter implements Iterator<OptionValue>{
-    private Iterator<Map.Entry<String, OptionValue>> inner;
-
-    public Iter(Iterator<Map.Entry<String, OptionValue>> inner) {
-      this.inner = inner;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return inner.hasNext();
-    }
-
-    @Override
-    public OptionValue next() {
-      return inner.next().getValue();
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-
-  }
   @Override
   public Iterator<OptionValue> iterator() {
-    return new Iter(options.iterator());
+    Map<String, OptionValue> buildList = Maps.newHashMap();
+    for(OptionValidator v : knownOptions.values()){
+      buildList.put(v.getOptionName(), v.getDefault());
+    }
+    for(Map.Entry<String, OptionValue> v : options){
+      OptionValue value = v.getValue();
+      buildList.put(value.name, value);
+    }
+    return buildList.values().iterator();
   }
 
   @Override
   public OptionValue getOption(String name) {
-    return options.get(name);
+    // check local space
+    OptionValue v = options.get(name);
+    if(v != null){
+      return v;
+    }
+
+    // otherwise, return default.
+    OptionValidator validator = knownOptions.get(name);
+    if(validator == null){
+      return null;
+    }else{
+      return validator.getDefault();
+    }
   }
 
   @Override
   public void setOption(OptionValue value) {
     assert value.type == OptionType.SYSTEM;
     admin.validate(value);
-    options.put(value.name, value);
+    setOptionInternal(value);
+  }
+
+  private void setOptionInternal(OptionValue value){
+    if(!value.equals(knownOptions.get(value.name))){
+      options.put(value.name, value);
+    }
   }
 
+
   @Override
   public void setOption(String name, SqlLiteral literal, OptionType type) {
     assert type == OptionValue.OptionType.SYSTEM;
     OptionValue v = admin.validate(name, literal);
     v.type = type;
-    options.put(name, v);
+    setOptionInternal(v);
   }
 
   @Override
@@ -172,10 +177,24 @@ public class SystemOptionManager implements OptionManager {
     public SystemOptionAdmin() {
       for(OptionValidator v : VALIDATORS) {
         knownOptions.put(v.getOptionName(), v);
-        options.putIfAbsent(v.getOptionName(), v.getDefault());
       }
-    }
 
+      for(Entry<String, OptionValue> v : options){
+        OptionValue value = v.getValue();
+        OptionValidator defaultValidator = knownOptions.get(v.getKey());
+        if(defaultValidator == null){
+          // deprecated option, delete.
+          options.delete(value.name);
+          logger.warn("Deleting deprecated option `{}`.", value.name);
+        }else if(value.equals(defaultValidator)){
+          // option set with default value, remove storage of record.
+          options.delete(value.name);
+          logger.warn("Deleting option `{}` set to default value.", value.name);
+        }
+
+      }
+
+    }
 
     @Override
     public void registerOptionType(OptionValidator validator) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index d6c8ee0..5d0eed6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -87,7 +87,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
       this.context = context;
       this.pluginSystemTable = context //
           .getPersistentStoreProvider() //
-          .getPStore(PStoreConfig //
+          .getStore(PStoreConfig //
               .newJacksonBuilder(context.getConfig().getMapper(), StoragePluginConfig.class) //
               .name("sys.storage_plugins") //
               .build());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index acd8fcb..7b9d52c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -88,8 +88,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
     if (storageEngineName == null) {
       this.knownViews = null;
     } else {
-      this.knownViews = provider.getPStore(PStoreConfig //
+      this.knownViews = provider.getStore(PStoreConfig //
           .newJacksonBuilder(drillConfig.getMapper(), String.class) //
+          .persist() //
           .name(Joiner.on('.').join("storage.views", storageEngineName, schemaName)) //
           .build());
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
index 7214092..2d04957 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
@@ -18,15 +18,11 @@
 
 package org.apache.drill.exec.store.sys;
 
-import java.util.Map;
 
 /**
  * Interfaces to define EStore, which is keep track of status/information for running queries. The information
  * would be gone, if the query is completed, or the foreman drillbit is not responding.
  * @param <V>
  */
-public interface EStore <V> extends Iterable<Map.Entry<String, V>> {
-  public V get(String key);
-  public void put(String key, V value);
-  public void delete(String key);
+public interface EStore <V> extends PStore<V> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
index 32bf0b1..b09c5b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
@@ -25,5 +25,5 @@ import java.io.IOException;
  */
 
 public interface EStoreProvider {
-  public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException;
+  public <V> PStore<V> getStore(PStoreConfig<V> table) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
index 040a99d..26c00ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
@@ -19,11 +19,13 @@ package org.apache.drill.exec.store.sys;
 
 import java.util.Map;
 
+/**
+ * Interface for reading and writing values to a persistent storage provider.  Iterators are guaranteed to be returned in key order.
+ * @param <V>
+ */
 public interface PStore<V> extends Iterable<Map.Entry<String, V>> {
   public V get(String key);
-  public V getBlob(String key);
   public void put(String key, V value);
-  public void putBlob(String key, V value);
   public boolean putIfAbsent(String key, V value);
   public void delete(String key);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
index 7d5243f..83c2243 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
@@ -31,11 +31,25 @@ public class PStoreConfig<V> {
 
   private final String name;
   private final PClassSerializer<V> valueSerializer;
+  private final Mode mode;
+  private final int maxIteratorSize;
 
-  private PStoreConfig(String name, PClassSerializer<V> valueSerializer) {
+  public static enum Mode {PERSISTENT, EPHEMERAL, BLOB_PERSISTENT};
+
+  private PStoreConfig(String name, PClassSerializer<V> valueSerializer, Mode mode, int maxIteratorSize) {
     super();
     this.name = name;
     this.valueSerializer = valueSerializer;
+    this.mode = mode;
+    this.maxIteratorSize = Math.abs(maxIteratorSize);
+  }
+
+  public Mode getMode() {
+    return mode;
+  }
+
+  public int getMaxIteratorSize() {
+    return maxIteratorSize;
   }
 
   public String getName() {
@@ -57,20 +71,47 @@ public class PStoreConfig<V> {
   public static class PStoreConfigBuilder<V> {
     String name;
     PClassSerializer<V> serializer;
+    Mode mode = Mode.PERSISTENT;
+    int maxIteratorSize = Integer.MAX_VALUE;
 
     PStoreConfigBuilder(PClassSerializer<V> serializer) {
       super();
       this.serializer = serializer;
     }
 
-    public <X extends Builder> PStoreConfigBuilder<V> name(String name) {
+    public PStoreConfigBuilder<V> name(String name) {
       this.name = name;
       return this;
     }
 
+    public PStoreConfigBuilder<V> persist(){
+      this.mode = Mode.PERSISTENT;
+      return this;
+    }
+
+    public PStoreConfigBuilder<V> ephemeral(){
+      this.mode = Mode.EPHEMERAL;
+      return this;
+    }
+
+    public PStoreConfigBuilder<V> blob(){
+      this.mode = Mode.BLOB_PERSISTENT;
+      return this;
+    }
+
+    /**
+     * Set the maximum size of the iterator.  Positive numbers start from the start of the list.  Negative numbers start from the end of the list.
+     * @param size
+     * @return
+     */
+    public PStoreConfigBuilder<V> max(int size){
+      this.maxIteratorSize = size;
+      return this;
+    }
+
     public PStoreConfig<V> build(){
       Preconditions.checkNotNull(name);
-      return new PStoreConfig<V>(name, serializer);
+      return new PStoreConfig<V>(name, serializer, mode, maxIteratorSize);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
index f6154e2..6371dfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
@@ -21,7 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 public interface PStoreProvider extends AutoCloseable, Closeable{
-  public <V> PStore<V> getPStore(PStoreConfig<V> table) throws IOException;
+
+  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException;
   public void start() throws IOException;
-  public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
new file mode 100644
index 0000000..9f6ec29
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.local;
+
+import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class FilePStore<V> implements PStore<V> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilePStore.class);
+
+
+  private final Path basePath;
+  private final PStoreConfig<V> config;
+  private final DrillFileSystem fs;
+
+  public FilePStore(DrillFileSystem fs, Path base, PStoreConfig<V> config) {
+    super();
+    this.basePath = new Path(base, config.getName());
+    this.config = config;
+    this.fs = fs;
+
+    try {
+      mk(basePath);
+    } catch (IOException e) {
+      throw new RuntimeException("Failure setting pstore configuration path.");
+    }
+  }
+
+  private void mk(Path path) throws IOException{
+    fs.getUnderlying().mkdirs(path);
+  }
+
+  public static Path getLogDir(){
+    String drillLogDir = System.getenv("DRILL_LOG_DIR");
+    if (drillLogDir == null) {
+      drillLogDir = "/var/log/drill";
+    }
+    return new Path("file://" + new File(drillLogDir).getAbsoluteFile().toString());
+  }
+
+  public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{
+    Path blobRoot = root == null ? getLogDir() : root;
+    Configuration fsConf = new Configuration();
+    if(blobRoot.toUri().getScheme() != null){
+      fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
+    }
+
+
+    DrillFileSystem fs = FileSystemCreator.getFileSystem(config, fsConf);
+    fs.getUnderlying().mkdirs(blobRoot);
+    return fs;
+  }
+
+  @Override
+  public Iterator<Entry<String, V>> iterator() {
+    try{
+      List<FileStatus> f = fs.list(false, basePath);
+      if (f == null || f.isEmpty()) {
+        return Collections.emptyIterator();
+      }
+      List<String> files = Lists.newArrayList();
+
+      for (FileStatus stat : f) {
+        String s = stat.getPath().getName();
+        if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
+          files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
+        }
+      }
+
+      Collections.sort(files);
+      files = files.subList(0, Math.min(files.size(), config.getMaxIteratorSize()));
+      return new Iter(files.iterator());
+
+    }catch(IOException e){
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Path p(String name) throws IOException {
+    Preconditions.checkArgument(
+        !name.contains("/") &&
+        !name.contains(":") &&
+        !name.contains(".."));
+
+    Path f = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
+    // do this to check file name.
+    return f;
+  }
+
+  public V get(String key) {
+    try{
+      Path path = p(key);
+      if(!fs.getUnderlying().exists(path)){
+        return null;
+      }
+    }catch(IOException e){
+      throw new RuntimeException(e);
+    }
+
+    try (InputStream is = fs.open(p(key)).getInputStream()) {
+      return config.getSerializer().deserialize(IOUtils.toByteArray(is));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void put(String key, V value) {
+    try (OutputStream os = fs.create(p(key)).getOuputStream()) {
+      IOUtils.write(config.getSerializer().serialize(value), os);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    try {
+      Path p = p(key);
+      if (fs.getUnderlying().exists(p)) {
+        return false;
+      } else {
+        put(key, value);
+        return true;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void delete(String key) {
+    try {
+      fs.getUnderlying().delete(p(key), false);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private class Iter implements Iterator<Entry<String, V>>{
+
+    private Iterator<String> keys;
+    private String current;
+
+    public Iter(Iterator<String> keys) {
+      super();
+      this.keys = keys;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return keys.hasNext();
+    }
+
+    @Override
+    public Entry<String, V> next() {
+      current = keys.next();
+      return new DeferredEntry(current);
+    }
+
+    @Override
+    public void remove() {
+      delete(current);
+      keys.remove();
+    }
+
+    private class DeferredEntry implements Entry<String, V> {
+
+      private String name;
+
+
+      public DeferredEntry(String name) {
+        super();
+        this.name = name;
+      }
+
+      @Override
+      public String getKey() {
+        return name;
+      }
+
+      @Override
+      public V getValue() {
+        return get(name);
+      }
+
+      @Override
+      public V setValue(V value) {
+        throw new UnsupportedOperationException();
+      }
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
index b505fd4..094d093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
@@ -18,19 +18,24 @@
 
 package org.apache.drill.exec.store.sys.local;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.EStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 public class LocalEStoreProvider implements EStoreProvider{
   private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = Maps.newConcurrentMap();
 
   @Override
-  public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException {
+  public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
+    Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral.");
+
     if (! (estores.containsKey(storeConfig)) ) {
       EStore<V> p = new MapEStore<V>();
       EStore<?> p2 = estores.putIfAbsent(storeConfig, p);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
deleted file mode 100644
index c10f862..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.local;
-
-import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class LocalPStore<V> implements PStore<V> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStore.class);
-
-  private static final String BLOB_QUALIFIER = "blob";
-
-  private final File basePath;
-  private final File blobPath;
-  private final PStoreConfig<V> config;
-  public LocalPStore(File base, PStoreConfig<V> config) {
-    super();
-    this.basePath = new File(base, config.getName());
-    this.blobPath = new File(basePath, BLOB_QUALIFIER);
-    if (!blobPath.exists()) {
-      blobPath.mkdirs();
-    }
-    this.config = config;
-  }
-
-  @Override
-  public Iterator<Entry<String, V>> iterator() {
-    String[] f = basePath.list();
-    if (f == null) {
-      return Collections.emptyIterator();
-    }
-    List<String> files = Lists.newArrayList();
-    for (String s : f) {
-      if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
-        files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
-      }
-    }
-
-    return new Iter(files.iterator());
-  }
-
-  private File p(String name, boolean blob) throws IOException {
-    Preconditions.checkArgument(
-        !name.contains("/") &&
-        !name.contains(":") &&
-        !name.contains(".."));
-
-    File f = new File(blob ? blobPath : basePath, name + DRILL_SYS_FILE_SUFFIX);
-    // do this to check file name.
-    f.getCanonicalPath();
-    return f;
-  }
-
-  @Override
-  public V get(String key) {
-    return get(key, false);
-  }
-
-  @Override
-  public V getBlob(String key) {
-    return get(key, true);
-  }
-
-  protected V get(String key, boolean blob) {
-    try (InputStream is = new FileInputStream(p(key, blob))) {
-      return config.getSerializer().deserialize(IOUtils.toByteArray(is));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void put(String key, V value) {
-    put(key, false, value);
-  }
-
-  @Override
-  public void putBlob(String key, V value) {
-    put(key, true, value);
-  }
-
-  protected void put(String key, boolean blob, V value) {
-    try (OutputStream os = new FileOutputStream(p(key, blob))) {
-      IOUtils.write(config.getSerializer().serialize(value), os);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public boolean putIfAbsent(String key, V value) {
-    try {
-      File f = p(key, false);
-      if (f.exists()) {
-        return false;
-      } else {
-        put(key, value);
-        return true;
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void delete(String key) {
-    try {
-      delete(key, false);
-      delete(key, true);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  protected void delete(String key, boolean blob) throws IOException {
-    try {
-      p(key, blob).delete();
-    } catch (FileNotFoundException e) { /* ignored */ }
-  }
-
-  private class Iter implements Iterator<Entry<String, V>>{
-
-    private Iterator<String> keys;
-    private String current;
-
-    public Iter(Iterator<String> keys) {
-      super();
-      this.keys = keys;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return keys.hasNext();
-    }
-
-    @Override
-    public Entry<String, V> next() {
-      current = keys.next();
-      return new DeferredEntry(current);
-    }
-
-    @Override
-    public void remove() {
-      delete(current);
-      keys.remove();
-    }
-
-    private class DeferredEntry implements Entry<String, V> {
-
-      private String name;
-
-      public DeferredEntry(String name) {
-        super();
-        this.name = name;
-      }
-
-      @Override
-      public String getKey() {
-        return name;
-      }
-
-      @Override
-      public V getValue() {
-        return get(name);
-      }
-
-      @Override
-      public V setValue(V value) {
-        throw new UnsupportedOperationException();
-      }
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
index c413866..ac53a61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
@@ -17,19 +17,19 @@
  */
 package org.apache.drill.exec.store.sys.local;
 
-import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
 import org.apache.drill.exec.store.sys.PStoreProvider;
-
-import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.hadoop.fs.Path;
 
 /**
  * A really simple provider that stores data in the local file system, one value per file.
@@ -37,21 +37,21 @@ import com.google.common.collect.Maps;
 public class LocalPStoreProvider implements PStoreProvider {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStoreProvider.class);
 
-  private File path;
+  private final Path path;
   private final boolean enableWrite;
-  private ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores;
+  private final ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores;
   private final LocalEStoreProvider estoreProvider;
+  private final DrillFileSystem fs;
 
-  public LocalPStoreProvider(DrillConfig config) {
-    path = new File(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
-    enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE);
-    if (!enableWrite) {
-      pstores = Maps.newConcurrentMap();
-    }
-    estoreProvider = new LocalEStoreProvider();
+  public LocalPStoreProvider(DrillConfig config) throws IOException {
+    this.path = new Path(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
+    this.enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE);
+    this.pstores = enableWrite ? null : new ConcurrentHashMap<PStoreConfig<?>, PStore<?>>();
+    this.estoreProvider = new LocalEStoreProvider();
+    this.fs = FilePStore.getFileSystem(config, path);
   }
 
-  public LocalPStoreProvider(PStoreRegistry registry) {
+  public LocalPStoreProvider(PStoreRegistry registry) throws IOException {
     this(registry.getConfig());
   }
 
@@ -60,14 +60,22 @@ public class LocalPStoreProvider implements PStoreProvider {
   }
 
   @Override
-  public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException {
-    return estoreProvider.getEStore(storeConfig);
+  public <V> PStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
+    switch(storeConfig.getMode()){
+    case EPHEMERAL:
+      return estoreProvider.getStore(storeConfig);
+    case BLOB_PERSISTENT:
+    case PERSISTENT:
+      return getPStore(storeConfig);
+    default:
+      throw new IllegalStateException();
+    }
+
   }
 
-  @Override
-  public <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException {
+  private <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException {
     if (enableWrite) {
-      return new LocalPStore<V>(path, storeConfig);
+      return new FilePStore<V>(fs, path, storeConfig);
     } else {
       PStore<V> p = new NoWriteLocalPStore<V>();
       PStore<?> p2 = pstores.putIfAbsent(storeConfig, p);
@@ -78,6 +86,7 @@ public class LocalPStoreProvider implements PStoreProvider {
     }
   }
 
+
   @Override
   public void start() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
index 84e5027..2723916 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
@@ -51,4 +51,10 @@ public class MapEStore <V> implements EStore<V> {
   public Iterator<Map.Entry<String, V>> iterator() {
     return store.entrySet().iterator();
   }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    V out = store.putIfAbsent(key, value);
+    return out == null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
index d1ef931..71a41f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
@@ -47,21 +47,11 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
   }
 
   @Override
-  public V getBlob(String key) {
-    return blobMap.get(key);
-  }
-
-  @Override
   public void put(String key, V value) {
     map.put(key, value);
   }
 
   @Override
-  public void putBlob(String key, V value) {
-    blobMap.put(key, value);
-  }
-
-  @Override
   public boolean putIfAbsent(String key, V value) {
     return null == map.putIfAbsent(key, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
index b88ff74..d61f3b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
@@ -17,16 +17,25 @@
  */
 package org.apache.drill.exec.store.sys.zk;
 
-import com.google.common.base.Preconditions;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.zookeeper.CreateMode;
-
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.zookeeper.CreateMode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 /**
  * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store)
  * @param <V>
@@ -35,6 +44,7 @@ public abstract class ZkAbstractStore<V> {
 
   protected CuratorFramework framework;
   protected PStoreConfig<V> config;
+  private final PathChildrenCache childrenCache;
   private String prefix;
   private String parent;
 
@@ -50,16 +60,19 @@ public abstract class ZkAbstractStore<V> {
       if (framework.checkExists().forPath(parent) == null) {
         framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
       }
+
+      this.childrenCache = new PathChildrenCache(framework, parent, true);
+      this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
+
     } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
+      throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e);
     }
 
   }
 
   public Iterator<Entry<String, V>> iterator() {
     try {
-      List<String> children = framework.getChildren().forPath(parent);
-      return new Iter(children.iterator());
+      return new Iter(childrenCache.getCurrentData());
     } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
     }
@@ -73,10 +86,11 @@ public abstract class ZkAbstractStore<V> {
 
   public V get(String key) {
     try {
-      byte[] bytes = framework.getData().forPath(p(key));
-      if (bytes == null) {
+      ChildData d = childrenCache.getCurrentData(p(key));
+      if(d == null || d.getData() == null){
         return null;
       }
+      byte[] bytes = d.getData();
       return config.getSerializer().deserialize(bytes);
 
     } catch (Exception e) {
@@ -86,11 +100,12 @@ public abstract class ZkAbstractStore<V> {
 
   public void put(String key, V value) {
     try {
-      if (framework.checkExists().forPath(p(key)) != null) {
+      if (childrenCache.getCurrentData(p(key)) != null) {
         framework.setData().forPath(p(key), config.getSerializer().serialize(value));
       } else {
         createNodeInZK(key, value);
       }
+      childrenCache.rebuildNode(p(key));
 
     } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
@@ -100,21 +115,43 @@ public abstract class ZkAbstractStore<V> {
   public void delete(String key) {
     try {
         framework.delete().forPath(p(key));
+        childrenCache.rebuildNode(p(key));
     } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
     }
   }
 
+  public boolean putIfAbsent(String key, V value) {
+    try {
+      if (childrenCache.getCurrentData(p(key)) != null) {
+        return false;
+      } else {
+        createNodeInZK(key, value);
+        childrenCache.rebuildNode(p(key));
+        return true;
+      }
+
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+  }
+
   public abstract void createNodeInZK (String key, V value);
 
   private class Iter implements Iterator<Entry<String, V>>{
 
-    private Iterator<String> keys;
-    private String current;
+    private Iterator<ChildData> keys;
+    private ChildData current;
 
-    public Iter(Iterator<String> keys) {
+    public Iter(List<ChildData> children) {
       super();
-      this.keys = keys;
+      List<ChildData> sortedChildren = Lists.newArrayList(children);
+      Collections.sort(sortedChildren, new Comparator<ChildData>(){
+        @Override
+        public int compare(ChildData o1, ChildData o2) {
+          return o1.getPath().compareTo(o2.getPath());
+        }});
+      this.keys = sortedChildren.iterator();
     }
 
     @Override
@@ -130,27 +167,35 @@ public abstract class ZkAbstractStore<V> {
 
     @Override
     public void remove() {
-      delete(current);
-      keys.remove();
+      delete(keyFromPath(current));
+    }
+
+    private String keyFromPath(ChildData data){
+      String path = data.getPath();
+      return path.substring(prefix.length(), path.length());
     }
 
     private class DeferredEntry implements Entry<String, V>{
 
-      private String name;
+      private ChildData data;
 
-      public DeferredEntry(String name) {
+      public DeferredEntry(ChildData data) {
         super();
-        this.name = name;
+        this.data = data;
       }
 
       @Override
       public String getKey() {
-        return name;
+        return keyFromPath(data);
       }
 
       @Override
       public V getValue() {
-        return get(name);
+        try {
+          return config.getSerializer().deserialize(data.getData());
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
index 34b59a7..1c2c3fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
@@ -22,6 +22,9 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.EStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
+
+import com.google.common.base.Preconditions;
 
 import java.io.IOException;
 
@@ -33,7 +36,8 @@ public class ZkEStoreProvider implements EStoreProvider{
   }
 
   @Override
-  public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
+  public <V> EStore<V> getStore(PStoreConfig<V> store) throws IOException {
+    Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL);
     return new ZkEStore<V>(curator,store);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
index 601ba8f..a597381 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
@@ -40,17 +40,9 @@ import com.google.common.base.Preconditions;
  */
 public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{
 
-  private DrillFileSystem fs;
-  private Path blobPath;
-  private boolean blobPathCreated;
-
-  ZkPStore(CuratorFramework framework, DrillFileSystem fs, Path blobRoot, PStoreConfig<V> config)
+  ZkPStore(CuratorFramework framework, PStoreConfig<V> config)
       throws IOException {
     super(framework, config);
-
-    this.fs = fs;
-    this.blobPath = new Path(blobRoot, config.getName());
-    this.blobPathCreated = false;
   }
 
   @Override
@@ -62,53 +54,7 @@ public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{
     }
   }
 
-  @Override
-  public boolean putIfAbsent(String key, V value) {
-    try {
-      if (framework.checkExists().forPath(p(key)) != null) {
-        return false;
-      } else {
-        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
-        return true;
-      }
-
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-  }
-
-  @Override
-  public V getBlob(String key) {
-    try (DrillInputStream is = fs.open(path(key))) {
-      return config.getSerializer().deserialize(IOUtils.toByteArray(is.getInputStream()));
-    } catch (FileNotFoundException e) {
-      return null;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void putBlob(String key, V value) {
-    try (DrillOutputStream os = fs.create(path(key))) {
-      IOUtils.write(config.getSerializer().serialize(value), os.getOuputStream());
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
 
-  private Path path(String name) throws IOException {
-    Preconditions.checkArgument(
-        !name.contains("/") &&
-        !name.contains(":") &&
-        !name.contains(".."));
 
-    if (!blobPathCreated) {
-      fs.mkdirs(blobPath);
-      blobPathCreated = true;
-    }
-
-    return new Path(blobPath, name + DRILL_SYS_FILE_SUFFIX);
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index aa64f53..03d2441 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -21,18 +21,17 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
 import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
 import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.drill.exec.store.sys.local.FilePStore;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -45,9 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider {
   private final CuratorFramework curator;
 
   private final DrillFileSystem fs;
-
   private final Path blobRoot;
-
   private final ZkEStoreProvider zkEStoreProvider;
 
   public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
@@ -59,49 +56,45 @@ public class ZkPStoreProvider implements PStoreProvider {
 
     if (registry.getConfig().hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) {
       blobRoot = new Path(registry.getConfig().getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT));
-    } else {
-      String drillLogDir = System.getenv("DRILL_LOG_DIR");
-      if (drillLogDir == null) {
-        drillLogDir = "/var/log/drill";
-      }
-      blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI());
+    }else{
+      blobRoot = FilePStore.getLogDir();
     }
-    Configuration fsConf = new Configuration();
-    fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
-    try {
-      fs = FileSystemCreator.getFileSystem(registry.getConfig(), fsConf);
-      fs.mkdirs(blobRoot);
-    } catch (IOException e) {
-      throw new DrillbitStartupException("Unable to initialize blob storage.", e);
+
+    try{
+      this.fs = FilePStore.getFileSystem(registry.getConfig(), blobRoot);
+    }catch(IOException e){
+      throw new DrillbitStartupException("Failure while attempting to set up blob store.", e);
     }
 
-    zkEStoreProvider = new ZkEStoreProvider(curator);
+
+    this.zkEStoreProvider = new ZkEStoreProvider(curator);
   }
 
   @VisibleForTesting
-  public ZkPStoreProvider(CuratorFramework curator) {
+  public ZkPStoreProvider(DrillConfig config, CuratorFramework curator) throws IOException {
     this.curator = curator;
-    this.fs = null;
-    String drillLogDir = System.getenv("DRILL_LOG_DIR");
-    if (drillLogDir == null) {
-      drillLogDir = "/var/log/drill";
-    }
-    blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI());
-    zkEStoreProvider = new ZkEStoreProvider(curator);
+    this.blobRoot = FilePStore.getLogDir();
+    this.fs = FilePStore.getFileSystem(config, blobRoot);
+    this.zkEStoreProvider = new ZkEStoreProvider(curator);
   }
 
   @Override
   public void close() {
   }
 
-  @Override
-  public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
-    return zkEStoreProvider.getEStore(store);
-  }
 
   @Override
-  public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
-    return new ZkPStore<V>(curator, fs, blobRoot, store);
+  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+    switch(config.getMode()){
+    case BLOB_PERSISTENT:
+      return new FilePStore<V>(fs, blobRoot, config);
+    case EPHEMERAL:
+      return zkEStoreProvider.getStore(config);
+    case PERSISTENT:
+      return new ZkPStore<V>(curator, config);
+    default:
+      throw new IllegalStateException();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index fc0972f..3228da9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -83,6 +83,16 @@ public class ControlHandlerImpl implements ControlMessageHandler {
       // TODO: Support a type of message that has no response.
       return DataRpcConfig.OK;
 
+    case RpcType.REQ_QUERY_CANCEL_VALUE:
+      QueryId id = get(pBody, QueryId.PARSER);
+      Foreman f = bee.getForemanForQueryId(id);
+      if(f != null){
+        f.cancel();
+        return DataRpcConfig.OK;
+      }else{
+        return DataRpcConfig.FAIL;
+      }
+
     case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE:
       InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
       for(int i =0; i < fragments.getFragmentCount(); i++){
@@ -95,13 +105,9 @@ public class ControlHandlerImpl implements ControlMessageHandler {
       Foreman foreman = bee.getForemanForQueryId(queryId);
       QueryProfile profile;
       if (foreman == null) {
-        try {
-          profile = bee.getContext().getPersistentStoreProvider().getEStore(QueryStatus.RUNNING_QUERY_PROFILE).get(QueryIdHelper.getQueryId(queryId));
-        } catch (IOException e) {
-          throw new RpcException("Failed to get persistent store", e);
-        }
+        throw new RpcException("Query not running on node.");
       } else {
-        profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile(true);
+        profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile();
       }
       return new Response(RpcType.RESP_QUERY_STATUS, profile);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index e47c0be..7a0e501 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -436,7 +436,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   @Override
   public int compareTo(Object o) {
-    return o.hashCode() - o.hashCode();
+    return hashCode() - o.hashCode();
   }
 
 }