You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/11 05:52:08 UTC

[23/61] [abbrv] Adding HBase Persistent Store.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
new file mode 100644
index 0000000..eb21c70
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.zk;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.zookeeper.CreateMode;
+
+import com.google.common.base.Preconditions;
+
+public class ZkPStore<V> implements PStore<V>{
+
+  private CuratorFramework framework;
+  private PStoreConfig<V> config;
+  private String prefix;
+  private String parent;
+
+  ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
+    this.parent = "/" + config.getName();
+    this.prefix = parent + "/";
+    this.framework = framework;
+    this.config = config;
+
+    // make sure the parent node exists.
+    try{
+      if(framework.checkExists().forPath(parent) == null) {
+        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
+      }
+    }catch(Exception e){
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+
+  }
+
+  @Override
+  public Iterator<Entry<String, V>> iterator() {
+    try{
+      List<String> children = framework.getChildren().forPath(parent);
+      return new Iter(children.iterator());
+    }catch(Exception e){
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+
+  }
+
+  private String p(String key){
+    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
+    return prefix + key;
+  }
+
+  @Override
+  public V get(String key) {
+    try{
+      byte[] bytes = framework.getData().forPath(p(key));
+      if(bytes == null){
+        return null;
+      }
+      return config.getSerializer().deserialize(bytes);
+
+    }catch(Exception e){
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+  }
+
+  @Override
+  public void put(String key, V value) {
+    try{
+      if(framework.checkExists().forPath(p(key)) != null) {
+        framework.setData().forPath(p(key), config.getSerializer().serialize(value));
+      }else{
+        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
+      }
+
+    }catch(Exception e){
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    try{
+      if(framework.checkExists().forPath(p(key)) != null) {
+        return false;
+      }else{
+        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
+        return true;
+      }
+
+    }catch(Exception e){
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+  }
+
+  @Override
+  public void delete(String key) {
+    try{
+      framework.delete().forPath(p(key));
+    }catch(Exception e){
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+  }
+
+  private class Iter implements Iterator<Entry<String, V>>{
+
+    private Iterator<String> keys;
+    private String current;
+
+    public Iter(Iterator<String> keys) {
+      super();
+      this.keys = keys;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return keys.hasNext();
+    }
+
+    @Override
+    public Entry<String, V> next() {
+      current = keys.next();
+      return new DeferredEntry(current);
+    }
+
+    @Override
+    public void remove() {
+      delete(current);
+      keys.remove();
+    }
+
+    private class DeferredEntry implements Entry<String, V>{
+
+      private String name;
+
+      public DeferredEntry(String name) {
+        super();
+        this.name = name;
+      }
+
+      @Override
+      public String getKey() {
+        return name;
+      }
+
+      @Override
+      public V getValue() {
+        return get(name);
+      }
+
+      @Override
+      public V setValue(V value) {
+        throw new UnsupportedOperationException();
+      }
+
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
new file mode 100644
index 0000000..f4513c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.zk;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PStoreRegistry;
+
+public class ZkPStoreProvider implements PStoreProvider{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
+
+  private final CuratorFramework curator;
+
+  public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
+    ClusterCoordinator coord = registry.getClusterCoordinator();
+    if (!(coord instanceof ZKClusterCoordinator)) {
+      throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator.");
+    }
+    this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
+  }
+
+  public ZkPStoreProvider(CuratorFramework curator) {
+    this.curator = curator;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
+    return new ZkPStore<V>(curator, store);
+  }
+
+  @Override
+  public void start() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
deleted file mode 100644
index e2a6ecf..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.PTable;
-import org.apache.drill.exec.store.sys.PTableConfig;
-import org.apache.zookeeper.CreateMode;
-
-import com.google.common.base.Preconditions;
-
-public class ZkPTable<V> implements PTable<V>{
-
-  private CuratorFramework framework;
-  private PTableConfig<V> config;
-  private String prefix;
-  private String parent;
-
-  ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws IOException {
-    super();
-    this.parent = "/" + config.getName();
-    this.prefix = parent + "/";
-    this.framework = framework;
-    this.config = config;
-
-    // make sure the parent node exists.
-    try{
-      if(framework.checkExists().forPath(parent) == null) {
-        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
-      }
-    }catch(Exception e){
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-
-  }
-
-  @Override
-  public Iterator<Entry<String, V>> iterator() {
-    try{
-      List<String> children = framework.getChildren().forPath(parent);
-      return new Iter(children.iterator());
-    }catch(Exception e){
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-
-  }
-
-  private String p(String key){
-    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
-    return prefix + key;
-  }
-
-  @Override
-  public V get(String key) {
-    try{
-      byte[] bytes = framework.getData().forPath(p(key));
-      if(bytes == null){
-        return null;
-      }
-      return config.getSerializer().deserialize(bytes);
-
-    }catch(Exception e){
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-  }
-
-  @Override
-  public void put(String key, V value) {
-    try{
-      if(framework.checkExists().forPath(p(key)) != null) {
-        framework.setData().forPath(p(key), config.getSerializer().serialize(value));
-      }else{
-        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
-      }
-
-    }catch(Exception e){
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-
-  }
-
-  @Override
-  public boolean putIfAbsent(String key, V value) {
-    try{
-      if(framework.checkExists().forPath(p(key)) != null) {
-        return false;
-      }else{
-        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
-        return true;
-      }
-
-    }catch(Exception e){
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-  }
-
-  @Override
-  public void delete(String key) {
-    try{
-      framework.delete().forPath(p(key));
-    }catch(Exception e){
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-  }
-
-
-  private class Iter implements Iterator<Entry<String, V>>{
-
-    private Iterator<String> keys;
-    private String current;
-
-    public Iter(Iterator<String> keys) {
-      super();
-      this.keys = keys;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return keys.hasNext();
-    }
-
-    @Override
-    public Entry<String, V> next() {
-      current = keys.next();
-      return new DeferredEntry(current);
-    }
-
-    @Override
-    public void remove() {
-      delete(current);
-      keys.remove();
-    }
-
-
-    private class DeferredEntry implements Entry<String, V>{
-
-      private String name;
-
-
-      public DeferredEntry(String name) {
-        super();
-        this.name = name;
-      }
-
-      @Override
-      public String getKey() {
-        return name;
-      }
-
-      @Override
-      public V getValue() {
-        return get(name);
-      }
-
-      @Override
-      public V setValue(V value) {
-        throw new UnsupportedOperationException();
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
deleted file mode 100644
index 8d2e153..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.PTable;
-import org.apache.drill.exec.store.sys.PTableConfig;
-import org.apache.drill.exec.store.sys.TableProvider;
-
-public class ZkTableProvider implements TableProvider{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class);
-
-  private final CuratorFramework curator;
-
-  public ZkTableProvider(CuratorFramework curator){
-    this.curator = curator;
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException {
-    return new ZkPTable<V>(curator, table);
-  }
-
-  @Override
-  public void start() {
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 1eae8c5..71e4e8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler;
 import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.sys.TableProvider;
+import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.batch.ControlHandlerImpl;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
@@ -87,7 +87,7 @@ public class WorkManager implements Closeable{
     this.dataHandler = new DataResponseHandlerImpl(bee);
   }
 
-  public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, TableProvider provider){
+  public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){
     this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider);
  //   executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
     executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
@@ -113,7 +113,9 @@ public class WorkManager implements Closeable{
   @Override
   public void close() throws IOException {
     try {
-      executor.awaitTermination(1, TimeUnit.SECONDS);
+      if (executor != null) {
+        executor.awaitTermination(1, TimeUnit.SECONDS);
+      }
     } catch (InterruptedException e) {
       logger.warn("Executor interrupted while awaiting termination");
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6a7c9d5..9ce22c7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -94,7 +94,8 @@ drill.exec: {
     affinity.factor: 1.2,
     executor.threads: 4
   },
-  sys.tables: {
+  sys.store.provider: {
+    class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
     local: {
       path: "/tmp/drill",
       write: true

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 547af34..ad114ab 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.local.LocalTableProvider;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 import org.junit.Rule;
 import org.junit.rules.TestRule;
 
@@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{
     final DistributedCache cache = new LocalCache();
     cache.run();
 
-    final LocalTableProvider provider = new LocalTableProvider(config);
+    final LocalPStoreProvider provider = new LocalPStoreProvider(config);
     provider.start();
 
     final SystemOptionManager opt = new SystemOptionManager(config, provider);
@@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{
         result = opt;
         dbContext.getCache();
         result = cache;
-        dbContext.getSystemTableProvider();
+        dbContext.getPersistentStoreProvider();
         result = provider;
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 8fc37f3..199ecfc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.local.LocalTableProvider;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.junit.AfterClass;
@@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
       }
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
-    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalTableProvider(DrillConfig.create()));
+    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
     QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
new file mode 100644
index 0000000..6f7794b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+
+public class PStoreTestUtil {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
+
+  public static void test(PStoreProvider provider) throws Exception{
+    PStore<String> store = provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
+    String[] keys = {"first", "second"};
+    String[] values = {"value1", "value2"};
+    Map<String, String> expectedMap = Maps.newHashMap();
+
+    for(int i =0; i < keys.length; i++){
+      expectedMap.put(keys[i], values[i]);
+      store.put(keys[i], values[i]);
+    }
+
+    {
+      Iterator<Map.Entry<String, String>> iter = store.iterator();
+      for(int i =0; i < keys.length; i++){
+        Entry<String, String> e = iter.next();
+        assertTrue(expectedMap.containsKey(e.getKey()));
+        assertEquals(expectedMap.get(e.getKey()), e.getValue());
+      }
+
+      assertFalse(iter.hasNext());
+    }
+
+    {
+      Iterator<Map.Entry<String, String>> iter = store.iterator();
+      while(iter.hasNext()){
+        iter.next();
+        iter.remove();
+      }
+    }
+
+    assertFalse(store.iterator().hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
deleted file mode 100644
index 47a783b..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-
-public class PTableTestUtil {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class);
-
-  public static void test(TableProvider provider) throws Exception{
-    PTable<String> table = provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
-    String[] keys = {"first", "second"};
-    String[] values = {"value1", "value2"};
-    Map<String, String> expectedMap = Maps.newHashMap();
-
-    for(int i =0; i < keys.length; i++){
-      expectedMap.put(keys[i], values[i]);
-      table.put(keys[i], values[i]);
-    }
-
-    {
-      Iterator<Map.Entry<String, String>> iter = table.iterator();
-      for(int i =0; i < keys.length; i++){
-        Entry<String, String> e = iter.next();
-        assertTrue(expectedMap.containsKey(e.getKey()));
-        assertEquals(expectedMap.get(e.getKey()), e.getValue());
-      }
-
-      assertFalse(iter.hasNext());
-    }
-
-    {
-      Iterator<Map.Entry<String, String>> iter = table.iterator();
-      while(iter.hasNext()){
-        iter.next();
-        iter.remove();
-      }
-    }
-
-    assertFalse(table.iterator().hasNext());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
new file mode 100644
index 0000000..18d87c7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.TestWithZookeeper;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
+import org.junit.Test;
+
+public class TestPStoreProviders extends TestWithZookeeper {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
+
+  static LocalPStoreProvider provider;
+
+  @Test
+  public void verifyLocalStore() throws Exception {
+    try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){
+      PStoreTestUtil.test(provider);
+    }
+  }
+
+  @Test
+  public void verifyZkStore() throws Exception {
+    DrillConfig config = getConfig();
+    String connect = config.getString(ExecConstants.ZK_CONNECTION);
+    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+    .namespace(config.getString(ExecConstants.ZK_ROOT))
+    .retryPolicy(new RetryNTimes(1, 100))
+    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+    .connectString(connect);
+
+    try(CuratorFramework curator = builder.build()){
+      curator.start();
+      ZkPStoreProvider provider = new ZkPStoreProvider(curator);
+      PStoreTestUtil.test(provider);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
deleted file mode 100644
index b7d92fe..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.TestWithZookeeper;
-import org.apache.drill.exec.store.sys.local.LocalTableProvider;
-import org.apache.drill.exec.store.sys.zk.ZkTableProvider;
-import org.junit.Test;
-
-public class TestTableProviders extends TestWithZookeeper {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTableProviders.class);
-
-  static LocalTableProvider provider;
-
-  @Test
-  public void verifyLocalTable() throws Exception {
-    try(LocalTableProvider provider = new LocalTableProvider(DrillConfig.create())){
-      PTableTestUtil.test(provider);
-    }
-  }
-
-  @Test
-  public void verifyZkTable() throws Exception {
-    DrillConfig config = getConfig();
-    String connect = config.getString(ExecConstants.ZK_CONNECTION);
-    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-    .namespace(config.getString(ExecConstants.ZK_ROOT))
-    .retryPolicy(new RetryNTimes(1, 100))
-    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
-    .connectString(connect);
-
-    try(CuratorFramework curator = builder.build()){
-      curator.start();
-      ZkTableProvider provider = new ZkTableProvider(curator);
-      PTableTestUtil.test(provider);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b2289f3..500f0fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,7 +262,7 @@
           <artifactId>maven-surefire-plugin</artifactId>
           <version>2.17</version>
           <configuration>
-            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
+            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
             <forkCount>4</forkCount>
             <reuseForks>true</reuseForks>
             <additionalClasspathElements>


Re: [23/61] [abbrv] Adding HBase Persistent Store.

Posted by Timothy Chen <tn...@gmail.com>.
Never mind it's another commit.

Tim

On Tue, Jun 10, 2014 at 9:00 PM, Timothy Chen <tn...@gmail.com> wrote:
> Looks like we added ZK Pstore instead of HBase?
>
> Tim
>
> On Tue, Jun 10, 2014 at 8:52 PM,  <ja...@apache.org> wrote:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
>> new file mode 100644
>> index 0000000..eb21c70
>> --- /dev/null
>> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
>> @@ -0,0 +1,180 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys.zk;
>> +
>> +import java.io.IOException;
>> +import java.util.Iterator;
>> +import java.util.List;
>> +import java.util.Map.Entry;
>> +
>> +import org.apache.curator.framework.CuratorFramework;
>> +import org.apache.drill.exec.store.sys.PStore;
>> +import org.apache.drill.exec.store.sys.PStoreConfig;
>> +import org.apache.zookeeper.CreateMode;
>> +
>> +import com.google.common.base.Preconditions;
>> +
>> +public class ZkPStore<V> implements PStore<V>{
>> +
>> +  private CuratorFramework framework;
>> +  private PStoreConfig<V> config;
>> +  private String prefix;
>> +  private String parent;
>> +
>> +  ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
>> +    this.parent = "/" + config.getName();
>> +    this.prefix = parent + "/";
>> +    this.framework = framework;
>> +    this.config = config;
>> +
>> +    // make sure the parent node exists.
>> +    try{
>> +      if(framework.checkExists().forPath(parent) == null) {
>> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
>> +      }
>> +    }catch(Exception e){
>> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> +    }
>> +
>> +  }
>> +
>> +  @Override
>> +  public Iterator<Entry<String, V>> iterator() {
>> +    try{
>> +      List<String> children = framework.getChildren().forPath(parent);
>> +      return new Iter(children.iterator());
>> +    }catch(Exception e){
>> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> +    }
>> +
>> +  }
>> +
>> +  private String p(String key){
>> +    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
>> +    return prefix + key;
>> +  }
>> +
>> +  @Override
>> +  public V get(String key) {
>> +    try{
>> +      byte[] bytes = framework.getData().forPath(p(key));
>> +      if(bytes == null){
>> +        return null;
>> +      }
>> +      return config.getSerializer().deserialize(bytes);
>> +
>> +    }catch(Exception e){
>> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void put(String key, V value) {
>> +    try{
>> +      if(framework.checkExists().forPath(p(key)) != null) {
>> +        framework.setData().forPath(p(key), config.getSerializer().serialize(value));
>> +      }else{
>> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> +      }
>> +
>> +    }catch(Exception e){
>> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> +    }
>> +
>> +  }
>> +
>> +  @Override
>> +  public boolean putIfAbsent(String key, V value) {
>> +    try{
>> +      if(framework.checkExists().forPath(p(key)) != null) {
>> +        return false;
>> +      }else{
>> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> +        return true;
>> +      }
>> +
>> +    }catch(Exception e){
>> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> +    }
>> +  }
>> +
>> +  @Override
>> +  public void delete(String key) {
>> +    try{
>> +      framework.delete().forPath(p(key));
>> +    }catch(Exception e){
>> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> +    }
>> +  }
>> +
>> +  private class Iter implements Iterator<Entry<String, V>>{
>> +
>> +    private Iterator<String> keys;
>> +    private String current;
>> +
>> +    public Iter(Iterator<String> keys) {
>> +      super();
>> +      this.keys = keys;
>> +    }
>> +
>> +    @Override
>> +    public boolean hasNext() {
>> +      return keys.hasNext();
>> +    }
>> +
>> +    @Override
>> +    public Entry<String, V> next() {
>> +      current = keys.next();
>> +      return new DeferredEntry(current);
>> +    }
>> +
>> +    @Override
>> +    public void remove() {
>> +      delete(current);
>> +      keys.remove();
>> +    }
>> +
>> +    private class DeferredEntry implements Entry<String, V>{
>> +
>> +      private String name;
>> +
>> +      public DeferredEntry(String name) {
>> +        super();
>> +        this.name = name;
>> +      }
>> +
>> +      @Override
>> +      public String getKey() {
>> +        return name;
>> +      }
>> +
>> +      @Override
>> +      public V getValue() {
>> +        return get(name);
>> +      }
>> +
>> +      @Override
>> +      public V setValue(V value) {
>> +        throw new UnsupportedOperationException();
>> +      }
>> +
>> +    }
>> +
>> +  }
>> +
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
>> new file mode 100644
>> index 0000000..f4513c2
>> --- /dev/null
>> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
>> @@ -0,0 +1,61 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys.zk;
>> +
>> +import java.io.IOException;
>> +
>> +import org.apache.curator.framework.CuratorFramework;
>> +import org.apache.drill.exec.coord.ClusterCoordinator;
>> +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
>> +import org.apache.drill.exec.exception.DrillbitStartupException;
>> +import org.apache.drill.exec.store.sys.PStore;
>> +import org.apache.drill.exec.store.sys.PStoreConfig;
>> +import org.apache.drill.exec.store.sys.PStoreProvider;
>> +import org.apache.drill.exec.store.sys.PStoreRegistry;
>> +
>> +public class ZkPStoreProvider implements PStoreProvider{
>> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
>> +
>> +  private final CuratorFramework curator;
>> +
>> +  public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
>> +    ClusterCoordinator coord = registry.getClusterCoordinator();
>> +    if (!(coord instanceof ZKClusterCoordinator)) {
>> +      throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator.");
>> +    }
>> +    this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
>> +  }
>> +
>> +  public ZkPStoreProvider(CuratorFramework curator) {
>> +    this.curator = curator;
>> +  }
>> +
>> +  @Override
>> +  public void close() {
>> +  }
>> +
>> +  @Override
>> +  public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
>> +    return new ZkPStore<V>(curator, store);
>> +  }
>> +
>> +  @Override
>> +  public void start() {
>> +  }
>> +
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
>> deleted file mode 100644
>> index e2a6ecf..0000000
>> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
>> +++ /dev/null
>> @@ -1,182 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys.zk;
>> -
>> -import java.io.IOException;
>> -import java.util.Iterator;
>> -import java.util.List;
>> -import java.util.Map.Entry;
>> -
>> -import org.apache.curator.framework.CuratorFramework;
>> -import org.apache.drill.exec.store.sys.PTable;
>> -import org.apache.drill.exec.store.sys.PTableConfig;
>> -import org.apache.zookeeper.CreateMode;
>> -
>> -import com.google.common.base.Preconditions;
>> -
>> -public class ZkPTable<V> implements PTable<V>{
>> -
>> -  private CuratorFramework framework;
>> -  private PTableConfig<V> config;
>> -  private String prefix;
>> -  private String parent;
>> -
>> -  ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws IOException {
>> -    super();
>> -    this.parent = "/" + config.getName();
>> -    this.prefix = parent + "/";
>> -    this.framework = framework;
>> -    this.config = config;
>> -
>> -    // make sure the parent node exists.
>> -    try{
>> -      if(framework.checkExists().forPath(parent) == null) {
>> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
>> -      }
>> -    }catch(Exception e){
>> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> -    }
>> -
>> -  }
>> -
>> -  @Override
>> -  public Iterator<Entry<String, V>> iterator() {
>> -    try{
>> -      List<String> children = framework.getChildren().forPath(parent);
>> -      return new Iter(children.iterator());
>> -    }catch(Exception e){
>> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> -    }
>> -
>> -  }
>> -
>> -  private String p(String key){
>> -    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
>> -    return prefix + key;
>> -  }
>> -
>> -  @Override
>> -  public V get(String key) {
>> -    try{
>> -      byte[] bytes = framework.getData().forPath(p(key));
>> -      if(bytes == null){
>> -        return null;
>> -      }
>> -      return config.getSerializer().deserialize(bytes);
>> -
>> -    }catch(Exception e){
>> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> -    }
>> -  }
>> -
>> -  @Override
>> -  public void put(String key, V value) {
>> -    try{
>> -      if(framework.checkExists().forPath(p(key)) != null) {
>> -        framework.setData().forPath(p(key), config.getSerializer().serialize(value));
>> -      }else{
>> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> -      }
>> -
>> -    }catch(Exception e){
>> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> -    }
>> -
>> -  }
>> -
>> -  @Override
>> -  public boolean putIfAbsent(String key, V value) {
>> -    try{
>> -      if(framework.checkExists().forPath(p(key)) != null) {
>> -        return false;
>> -      }else{
>> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> -        return true;
>> -      }
>> -
>> -    }catch(Exception e){
>> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> -    }
>> -  }
>> -
>> -  @Override
>> -  public void delete(String key) {
>> -    try{
>> -      framework.delete().forPath(p(key));
>> -    }catch(Exception e){
>> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
>> -    }
>> -  }
>> -
>> -
>> -  private class Iter implements Iterator<Entry<String, V>>{
>> -
>> -    private Iterator<String> keys;
>> -    private String current;
>> -
>> -    public Iter(Iterator<String> keys) {
>> -      super();
>> -      this.keys = keys;
>> -    }
>> -
>> -    @Override
>> -    public boolean hasNext() {
>> -      return keys.hasNext();
>> -    }
>> -
>> -    @Override
>> -    public Entry<String, V> next() {
>> -      current = keys.next();
>> -      return new DeferredEntry(current);
>> -    }
>> -
>> -    @Override
>> -    public void remove() {
>> -      delete(current);
>> -      keys.remove();
>> -    }
>> -
>> -
>> -    private class DeferredEntry implements Entry<String, V>{
>> -
>> -      private String name;
>> -
>> -
>> -      public DeferredEntry(String name) {
>> -        super();
>> -        this.name = name;
>> -      }
>> -
>> -      @Override
>> -      public String getKey() {
>> -        return name;
>> -      }
>> -
>> -      @Override
>> -      public V getValue() {
>> -        return get(name);
>> -      }
>> -
>> -      @Override
>> -      public V setValue(V value) {
>> -        throw new UnsupportedOperationException();
>> -      }
>> -
>> -    }
>> -  }
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
>> deleted file mode 100644
>> index 8d2e153..0000000
>> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
>> +++ /dev/null
>> @@ -1,50 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys.zk;
>> -
>> -import java.io.IOException;
>> -
>> -import org.apache.curator.framework.CuratorFramework;
>> -import org.apache.drill.exec.store.sys.PTable;
>> -import org.apache.drill.exec.store.sys.PTableConfig;
>> -import org.apache.drill.exec.store.sys.TableProvider;
>> -
>> -public class ZkTableProvider implements TableProvider{
>> -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class);
>> -
>> -  private final CuratorFramework curator;
>> -
>> -  public ZkTableProvider(CuratorFramework curator){
>> -    this.curator = curator;
>> -  }
>> -
>> -  @Override
>> -  public void close() {
>> -  }
>> -
>> -  @Override
>> -  public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException {
>> -    return new ZkPTable<V>(curator, table);
>> -  }
>> -
>> -  @Override
>> -  public void start() {
>> -  }
>> -
>> -
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> index 1eae8c5..71e4e8e 100644
>> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler;
>>  import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
>>  import org.apache.drill.exec.server.BootStrapContext;
>>  import org.apache.drill.exec.server.DrillbitContext;
>> -import org.apache.drill.exec.store.sys.TableProvider;
>> +import org.apache.drill.exec.store.sys.PStoreProvider;
>>  import org.apache.drill.exec.work.batch.ControlHandlerImpl;
>>  import org.apache.drill.exec.work.batch.ControlMessageHandler;
>>  import org.apache.drill.exec.work.foreman.Foreman;
>> @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{
>>      this.dataHandler = new DataResponseHandlerImpl(bee);
>>    }
>>
>> -  public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, TableProvider provider){
>> +  public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){
>>      this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider);
>>   //   executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
>>      executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
>> @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{
>>    @Override
>>    public void close() throws IOException {
>>      try {
>> -      executor.awaitTermination(1, TimeUnit.SECONDS);
>> +      if (executor != null) {
>> +        executor.awaitTermination(1, TimeUnit.SECONDS);
>> +      }
>>      } catch (InterruptedException e) {
>>        logger.warn("Executor interrupted while awaiting termination");
>>      }
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
>> index 6a7c9d5..9ce22c7 100644
>> --- a/exec/java-exec/src/main/resources/drill-module.conf
>> +++ b/exec/java-exec/src/main/resources/drill-module.conf
>> @@ -94,7 +94,8 @@ drill.exec: {
>>      affinity.factor: 1.2,
>>      executor.threads: 4
>>    },
>> -  sys.tables: {
>> +  sys.store.provider: {
>> +    class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
>>      local: {
>>        path: "/tmp/drill",
>>        write: true
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> index 547af34..ad114ab 100644
>> --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager;
>>  import org.apache.drill.exec.server.options.SessionOptionManager;
>>  import org.apache.drill.exec.server.options.SystemOptionManager;
>>  import org.apache.drill.exec.store.StoragePluginRegistry;
>> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
>> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>>  import org.junit.Rule;
>>  import org.junit.rules.TestRule;
>>
>> @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{
>>      final DistributedCache cache = new LocalCache();
>>      cache.run();
>>
>> -    final LocalTableProvider provider = new LocalTableProvider(config);
>> +    final LocalPStoreProvider provider = new LocalPStoreProvider(config);
>>      provider.start();
>>
>>      final SystemOptionManager opt = new SystemOptionManager(config, provider);
>> @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{
>>          result = opt;
>>          dbContext.getCache();
>>          result = cache;
>> -        dbContext.getSystemTableProvider();
>> +        dbContext.getPersistentStoreProvider();
>>          result = provider;
>>        }
>>      };
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> index 8fc37f3..199ecfc 100644
>> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit;
>>  import org.apache.drill.exec.server.DrillbitContext;
>>  import org.apache.drill.exec.server.RemoteServiceSet;
>>  import org.apache.drill.exec.store.StoragePluginRegistry;
>> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
>> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>>  import org.apache.drill.exec.vector.ValueVector;
>>  import org.apache.drill.exec.vector.VarBinaryVector;
>>  import org.junit.AfterClass;
>> @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
>>        }
>>      };
>>      RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
>> -    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalTableProvider(DrillConfig.create()));
>> +    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
>>      QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
>>      PhysicalPlanReader reader = bitContext.getPlanReader();
>>      LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
>> new file mode 100644
>> index 0000000..6f7794b
>> --- /dev/null
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
>> @@ -0,0 +1,66 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys;
>> +
>> +import static org.junit.Assert.assertEquals;
>> +import static org.junit.Assert.assertFalse;
>> +import static org.junit.Assert.assertTrue;
>> +
>> +import java.util.Iterator;
>> +import java.util.Map;
>> +import java.util.Map.Entry;
>> +
>> +import com.fasterxml.jackson.databind.ObjectMapper;
>> +import com.google.common.collect.Maps;
>> +
>> +public class PStoreTestUtil {
>> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
>> +
>> +  public static void test(PStoreProvider provider) throws Exception{
>> +    PStore<String> store = provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
>> +    String[] keys = {"first", "second"};
>> +    String[] values = {"value1", "value2"};
>> +    Map<String, String> expectedMap = Maps.newHashMap();
>> +
>> +    for(int i =0; i < keys.length; i++){
>> +      expectedMap.put(keys[i], values[i]);
>> +      store.put(keys[i], values[i]);
>> +    }
>> +
>> +    {
>> +      Iterator<Map.Entry<String, String>> iter = store.iterator();
>> +      for(int i =0; i < keys.length; i++){
>> +        Entry<String, String> e = iter.next();
>> +        assertTrue(expectedMap.containsKey(e.getKey()));
>> +        assertEquals(expectedMap.get(e.getKey()), e.getValue());
>> +      }
>> +
>> +      assertFalse(iter.hasNext());
>> +    }
>> +
>> +    {
>> +      Iterator<Map.Entry<String, String>> iter = store.iterator();
>> +      while(iter.hasNext()){
>> +        iter.next();
>> +        iter.remove();
>> +      }
>> +    }
>> +
>> +    assertFalse(store.iterator().hasNext());
>> +  }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
>> deleted file mode 100644
>> index 47a783b..0000000
>> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
>> +++ /dev/null
>> @@ -1,66 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys;
>> -
>> -import static org.junit.Assert.assertEquals;
>> -import static org.junit.Assert.assertFalse;
>> -import static org.junit.Assert.assertTrue;
>> -
>> -import java.util.Iterator;
>> -import java.util.Map;
>> -import java.util.Map.Entry;
>> -
>> -import com.fasterxml.jackson.databind.ObjectMapper;
>> -import com.google.common.collect.Maps;
>> -
>> -public class PTableTestUtil {
>> -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class);
>> -
>> -  public static void test(TableProvider provider) throws Exception{
>> -    PTable<String> table = provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
>> -    String[] keys = {"first", "second"};
>> -    String[] values = {"value1", "value2"};
>> -    Map<String, String> expectedMap = Maps.newHashMap();
>> -
>> -    for(int i =0; i < keys.length; i++){
>> -      expectedMap.put(keys[i], values[i]);
>> -      table.put(keys[i], values[i]);
>> -    }
>> -
>> -    {
>> -      Iterator<Map.Entry<String, String>> iter = table.iterator();
>> -      for(int i =0; i < keys.length; i++){
>> -        Entry<String, String> e = iter.next();
>> -        assertTrue(expectedMap.containsKey(e.getKey()));
>> -        assertEquals(expectedMap.get(e.getKey()), e.getValue());
>> -      }
>> -
>> -      assertFalse(iter.hasNext());
>> -    }
>> -
>> -    {
>> -      Iterator<Map.Entry<String, String>> iter = table.iterator();
>> -      while(iter.hasNext()){
>> -        iter.next();
>> -        iter.remove();
>> -      }
>> -    }
>> -
>> -    assertFalse(table.iterator().hasNext());
>> -  }
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
>> new file mode 100644
>> index 0000000..18d87c7
>> --- /dev/null
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
>> @@ -0,0 +1,58 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys;
>> +
>> +import org.apache.curator.framework.CuratorFramework;
>> +import org.apache.curator.framework.CuratorFrameworkFactory;
>> +import org.apache.curator.retry.RetryNTimes;
>> +import org.apache.drill.common.config.DrillConfig;
>> +import org.apache.drill.exec.ExecConstants;
>> +import org.apache.drill.exec.TestWithZookeeper;
>> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>> +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
>> +import org.junit.Test;
>> +
>> +public class TestPStoreProviders extends TestWithZookeeper {
>> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
>> +
>> +  static LocalPStoreProvider provider;
>> +
>> +  @Test
>> +  public void verifyLocalStore() throws Exception {
>> +    try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){
>> +      PStoreTestUtil.test(provider);
>> +    }
>> +  }
>> +
>> +  @Test
>> +  public void verifyZkStore() throws Exception {
>> +    DrillConfig config = getConfig();
>> +    String connect = config.getString(ExecConstants.ZK_CONNECTION);
>> +    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
>> +    .namespace(config.getString(ExecConstants.ZK_ROOT))
>> +    .retryPolicy(new RetryNTimes(1, 100))
>> +    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
>> +    .connectString(connect);
>> +
>> +    try(CuratorFramework curator = builder.build()){
>> +      curator.start();
>> +      ZkPStoreProvider provider = new ZkPStoreProvider(curator);
>> +      PStoreTestUtil.test(provider);
>> +    }
>> +  }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
>> deleted file mode 100644
>> index b7d92fe..0000000
>> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
>> +++ /dev/null
>> @@ -1,58 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys;
>> -
>> -import org.apache.curator.framework.CuratorFramework;
>> -import org.apache.curator.framework.CuratorFrameworkFactory;
>> -import org.apache.curator.retry.RetryNTimes;
>> -import org.apache.drill.common.config.DrillConfig;
>> -import org.apache.drill.exec.ExecConstants;
>> -import org.apache.drill.exec.TestWithZookeeper;
>> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
>> -import org.apache.drill.exec.store.sys.zk.ZkTableProvider;
>> -import org.junit.Test;
>> -
>> -public class TestTableProviders extends TestWithZookeeper {
>> -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTableProviders.class);
>> -
>> -  static LocalTableProvider provider;
>> -
>> -  @Test
>> -  public void verifyLocalTable() throws Exception {
>> -    try(LocalTableProvider provider = new LocalTableProvider(DrillConfig.create())){
>> -      PTableTestUtil.test(provider);
>> -    }
>> -  }
>> -
>> -  @Test
>> -  public void verifyZkTable() throws Exception {
>> -    DrillConfig config = getConfig();
>> -    String connect = config.getString(ExecConstants.ZK_CONNECTION);
>> -    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
>> -    .namespace(config.getString(ExecConstants.ZK_ROOT))
>> -    .retryPolicy(new RetryNTimes(1, 100))
>> -    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
>> -    .connectString(connect);
>> -
>> -    try(CuratorFramework curator = builder.build()){
>> -      curator.start();
>> -      ZkTableProvider provider = new ZkTableProvider(curator);
>> -      PTableTestUtil.test(provider);
>> -    }
>> -  }
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml
>> ----------------------------------------------------------------------
>> diff --git a/pom.xml b/pom.xml
>> index b2289f3..500f0fd 100644
>> --- a/pom.xml
>> +++ b/pom.xml
>> @@ -262,7 +262,7 @@
>>            <artifactId>maven-surefire-plugin</artifactId>
>>            <version>2.17</version>
>>            <configuration>
>> -            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
>> +            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
>>              <forkCount>4</forkCount>
>>              <reuseForks>true</reuseForks>
>>              <additionalClasspathElements>
>>

Re: [23/61] [abbrv] Adding HBase Persistent Store.

Posted by Timothy Chen <tn...@gmail.com>.
Looks like we added ZK Pstore instead of HBase?

Tim

On Tue, Jun 10, 2014 at 8:52 PM,  <ja...@apache.org> wrote:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> new file mode 100644
> index 0000000..eb21c70
> --- /dev/null
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> @@ -0,0 +1,180 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys.zk;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map.Entry;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.drill.exec.store.sys.PStore;
> +import org.apache.drill.exec.store.sys.PStoreConfig;
> +import org.apache.zookeeper.CreateMode;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class ZkPStore<V> implements PStore<V>{
> +
> +  private CuratorFramework framework;
> +  private PStoreConfig<V> config;
> +  private String prefix;
> +  private String parent;
> +
> +  ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
> +    this.parent = "/" + config.getName();
> +    this.prefix = parent + "/";
> +    this.framework = framework;
> +    this.config = config;
> +
> +    // make sure the parent node exists.
> +    try{
> +      if(framework.checkExists().forPath(parent) == null) {
> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
> +      }
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +
> +  }
> +
> +  @Override
> +  public Iterator<Entry<String, V>> iterator() {
> +    try{
> +      List<String> children = framework.getChildren().forPath(parent);
> +      return new Iter(children.iterator());
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +
> +  }
> +
> +  private String p(String key){
> +    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
> +    return prefix + key;
> +  }
> +
> +  @Override
> +  public V get(String key) {
> +    try{
> +      byte[] bytes = framework.getData().forPath(p(key));
> +      if(bytes == null){
> +        return null;
> +      }
> +      return config.getSerializer().deserialize(bytes);
> +
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +  }
> +
> +  @Override
> +  public void put(String key, V value) {
> +    try{
> +      if(framework.checkExists().forPath(p(key)) != null) {
> +        framework.setData().forPath(p(key), config.getSerializer().serialize(value));
> +      }else{
> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> +      }
> +
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +
> +  }
> +
> +  @Override
> +  public boolean putIfAbsent(String key, V value) {
> +    try{
> +      if(framework.checkExists().forPath(p(key)) != null) {
> +        return false;
> +      }else{
> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> +        return true;
> +      }
> +
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +  }
> +
> +  @Override
> +  public void delete(String key) {
> +    try{
> +      framework.delete().forPath(p(key));
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +  }
> +
> +  private class Iter implements Iterator<Entry<String, V>>{
> +
> +    private Iterator<String> keys;
> +    private String current;
> +
> +    public Iter(Iterator<String> keys) {
> +      super();
> +      this.keys = keys;
> +    }
> +
> +    @Override
> +    public boolean hasNext() {
> +      return keys.hasNext();
> +    }
> +
> +    @Override
> +    public Entry<String, V> next() {
> +      current = keys.next();
> +      return new DeferredEntry(current);
> +    }
> +
> +    @Override
> +    public void remove() {
> +      delete(current);
> +      keys.remove();
> +    }
> +
> +    private class DeferredEntry implements Entry<String, V>{
> +
> +      private String name;
> +
> +      public DeferredEntry(String name) {
> +        super();
> +        this.name = name;
> +      }
> +
> +      @Override
> +      public String getKey() {
> +        return name;
> +      }
> +
> +      @Override
> +      public V getValue() {
> +        return get(name);
> +      }
> +
> +      @Override
> +      public V setValue(V value) {
> +        throw new UnsupportedOperationException();
> +      }
> +
> +    }
> +
> +  }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> new file mode 100644
> index 0000000..f4513c2
> --- /dev/null
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> @@ -0,0 +1,61 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys.zk;
> +
> +import java.io.IOException;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.drill.exec.coord.ClusterCoordinator;
> +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
> +import org.apache.drill.exec.exception.DrillbitStartupException;
> +import org.apache.drill.exec.store.sys.PStore;
> +import org.apache.drill.exec.store.sys.PStoreConfig;
> +import org.apache.drill.exec.store.sys.PStoreProvider;
> +import org.apache.drill.exec.store.sys.PStoreRegistry;
> +
> +public class ZkPStoreProvider implements PStoreProvider{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
> +
> +  private final CuratorFramework curator;
> +
> +  public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
> +    ClusterCoordinator coord = registry.getClusterCoordinator();
> +    if (!(coord instanceof ZKClusterCoordinator)) {
> +      throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator.");
> +    }
> +    this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
> +  }
> +
> +  public ZkPStoreProvider(CuratorFramework curator) {
> +    this.curator = curator;
> +  }
> +
> +  @Override
> +  public void close() {
> +  }
> +
> +  @Override
> +  public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
> +    return new ZkPStore<V>(curator, store);
> +  }
> +
> +  @Override
> +  public void start() {
> +  }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> deleted file mode 100644
> index e2a6ecf..0000000
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> +++ /dev/null
> @@ -1,182 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys.zk;
> -
> -import java.io.IOException;
> -import java.util.Iterator;
> -import java.util.List;
> -import java.util.Map.Entry;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.drill.exec.store.sys.PTable;
> -import org.apache.drill.exec.store.sys.PTableConfig;
> -import org.apache.zookeeper.CreateMode;
> -
> -import com.google.common.base.Preconditions;
> -
> -public class ZkPTable<V> implements PTable<V>{
> -
> -  private CuratorFramework framework;
> -  private PTableConfig<V> config;
> -  private String prefix;
> -  private String parent;
> -
> -  ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws IOException {
> -    super();
> -    this.parent = "/" + config.getName();
> -    this.prefix = parent + "/";
> -    this.framework = framework;
> -    this.config = config;
> -
> -    // make sure the parent node exists.
> -    try{
> -      if(framework.checkExists().forPath(parent) == null) {
> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
> -      }
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -
> -  }
> -
> -  @Override
> -  public Iterator<Entry<String, V>> iterator() {
> -    try{
> -      List<String> children = framework.getChildren().forPath(parent);
> -      return new Iter(children.iterator());
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -
> -  }
> -
> -  private String p(String key){
> -    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
> -    return prefix + key;
> -  }
> -
> -  @Override
> -  public V get(String key) {
> -    try{
> -      byte[] bytes = framework.getData().forPath(p(key));
> -      if(bytes == null){
> -        return null;
> -      }
> -      return config.getSerializer().deserialize(bytes);
> -
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -  }
> -
> -  @Override
> -  public void put(String key, V value) {
> -    try{
> -      if(framework.checkExists().forPath(p(key)) != null) {
> -        framework.setData().forPath(p(key), config.getSerializer().serialize(value));
> -      }else{
> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> -      }
> -
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -
> -  }
> -
> -  @Override
> -  public boolean putIfAbsent(String key, V value) {
> -    try{
> -      if(framework.checkExists().forPath(p(key)) != null) {
> -        return false;
> -      }else{
> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> -        return true;
> -      }
> -
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -  }
> -
> -  @Override
> -  public void delete(String key) {
> -    try{
> -      framework.delete().forPath(p(key));
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -  }
> -
> -
> -  private class Iter implements Iterator<Entry<String, V>>{
> -
> -    private Iterator<String> keys;
> -    private String current;
> -
> -    public Iter(Iterator<String> keys) {
> -      super();
> -      this.keys = keys;
> -    }
> -
> -    @Override
> -    public boolean hasNext() {
> -      return keys.hasNext();
> -    }
> -
> -    @Override
> -    public Entry<String, V> next() {
> -      current = keys.next();
> -      return new DeferredEntry(current);
> -    }
> -
> -    @Override
> -    public void remove() {
> -      delete(current);
> -      keys.remove();
> -    }
> -
> -
> -    private class DeferredEntry implements Entry<String, V>{
> -
> -      private String name;
> -
> -
> -      public DeferredEntry(String name) {
> -        super();
> -        this.name = name;
> -      }
> -
> -      @Override
> -      public String getKey() {
> -        return name;
> -      }
> -
> -      @Override
> -      public V getValue() {
> -        return get(name);
> -      }
> -
> -      @Override
> -      public V setValue(V value) {
> -        throw new UnsupportedOperationException();
> -      }
> -
> -    }
> -  }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> deleted file mode 100644
> index 8d2e153..0000000
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> +++ /dev/null
> @@ -1,50 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys.zk;
> -
> -import java.io.IOException;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.drill.exec.store.sys.PTable;
> -import org.apache.drill.exec.store.sys.PTableConfig;
> -import org.apache.drill.exec.store.sys.TableProvider;
> -
> -public class ZkTableProvider implements TableProvider{
> -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class);
> -
> -  private final CuratorFramework curator;
> -
> -  public ZkTableProvider(CuratorFramework curator){
> -    this.curator = curator;
> -  }
> -
> -  @Override
> -  public void close() {
> -  }
> -
> -  @Override
> -  public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException {
> -    return new ZkPTable<V>(curator, table);
> -  }
> -
> -  @Override
> -  public void start() {
> -  }
> -
> -
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> index 1eae8c5..71e4e8e 100644
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler;
>  import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
>  import org.apache.drill.exec.server.BootStrapContext;
>  import org.apache.drill.exec.server.DrillbitContext;
> -import org.apache.drill.exec.store.sys.TableProvider;
> +import org.apache.drill.exec.store.sys.PStoreProvider;
>  import org.apache.drill.exec.work.batch.ControlHandlerImpl;
>  import org.apache.drill.exec.work.batch.ControlMessageHandler;
>  import org.apache.drill.exec.work.foreman.Foreman;
> @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{
>      this.dataHandler = new DataResponseHandlerImpl(bee);
>    }
>
> -  public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, TableProvider provider){
> +  public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){
>      this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider);
>   //   executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
>      executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
> @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{
>    @Override
>    public void close() throws IOException {
>      try {
> -      executor.awaitTermination(1, TimeUnit.SECONDS);
> +      if (executor != null) {
> +        executor.awaitTermination(1, TimeUnit.SECONDS);
> +      }
>      } catch (InterruptedException e) {
>        logger.warn("Executor interrupted while awaiting termination");
>      }
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
> index 6a7c9d5..9ce22c7 100644
> --- a/exec/java-exec/src/main/resources/drill-module.conf
> +++ b/exec/java-exec/src/main/resources/drill-module.conf
> @@ -94,7 +94,8 @@ drill.exec: {
>      affinity.factor: 1.2,
>      executor.threads: 4
>    },
> -  sys.tables: {
> +  sys.store.provider: {
> +    class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
>      local: {
>        path: "/tmp/drill",
>        write: true
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> index 547af34..ad114ab 100644
> --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager;
>  import org.apache.drill.exec.server.options.SessionOptionManager;
>  import org.apache.drill.exec.server.options.SystemOptionManager;
>  import org.apache.drill.exec.store.StoragePluginRegistry;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>  import org.junit.Rule;
>  import org.junit.rules.TestRule;
>
> @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{
>      final DistributedCache cache = new LocalCache();
>      cache.run();
>
> -    final LocalTableProvider provider = new LocalTableProvider(config);
> +    final LocalPStoreProvider provider = new LocalPStoreProvider(config);
>      provider.start();
>
>      final SystemOptionManager opt = new SystemOptionManager(config, provider);
> @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{
>          result = opt;
>          dbContext.getCache();
>          result = cache;
> -        dbContext.getSystemTableProvider();
> +        dbContext.getPersistentStoreProvider();
>          result = provider;
>        }
>      };
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> index 8fc37f3..199ecfc 100644
> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit;
>  import org.apache.drill.exec.server.DrillbitContext;
>  import org.apache.drill.exec.server.RemoteServiceSet;
>  import org.apache.drill.exec.store.StoragePluginRegistry;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>  import org.apache.drill.exec.vector.ValueVector;
>  import org.apache.drill.exec.vector.VarBinaryVector;
>  import org.junit.AfterClass;
> @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
>        }
>      };
>      RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
> -    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalTableProvider(DrillConfig.create()));
> +    DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
>      QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
>      PhysicalPlanReader reader = bitContext.getPlanReader();
>      LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> new file mode 100644
> index 0000000..6f7794b
> --- /dev/null
> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> @@ -0,0 +1,66 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys;
> +
> +import static org.junit.Assert.assertEquals;
> +import static org.junit.Assert.assertFalse;
> +import static org.junit.Assert.assertTrue;
> +
> +import java.util.Iterator;
> +import java.util.Map;
> +import java.util.Map.Entry;
> +
> +import com.fasterxml.jackson.databind.ObjectMapper;
> +import com.google.common.collect.Maps;
> +
> +public class PStoreTestUtil {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
> +
> +  public static void test(PStoreProvider provider) throws Exception{
> +    PStore<String> store = provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
> +    String[] keys = {"first", "second"};
> +    String[] values = {"value1", "value2"};
> +    Map<String, String> expectedMap = Maps.newHashMap();
> +
> +    for(int i =0; i < keys.length; i++){
> +      expectedMap.put(keys[i], values[i]);
> +      store.put(keys[i], values[i]);
> +    }
> +
> +    {
> +      Iterator<Map.Entry<String, String>> iter = store.iterator();
> +      for(int i =0; i < keys.length; i++){
> +        Entry<String, String> e = iter.next();
> +        assertTrue(expectedMap.containsKey(e.getKey()));
> +        assertEquals(expectedMap.get(e.getKey()), e.getValue());
> +      }
> +
> +      assertFalse(iter.hasNext());
> +    }
> +
> +    {
> +      Iterator<Map.Entry<String, String>> iter = store.iterator();
> +      while(iter.hasNext()){
> +        iter.next();
> +        iter.remove();
> +      }
> +    }
> +
> +    assertFalse(store.iterator().hasNext());
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> deleted file mode 100644
> index 47a783b..0000000
> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> +++ /dev/null
> @@ -1,66 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys;
> -
> -import static org.junit.Assert.assertEquals;
> -import static org.junit.Assert.assertFalse;
> -import static org.junit.Assert.assertTrue;
> -
> -import java.util.Iterator;
> -import java.util.Map;
> -import java.util.Map.Entry;
> -
> -import com.fasterxml.jackson.databind.ObjectMapper;
> -import com.google.common.collect.Maps;
> -
> -public class PTableTestUtil {
> -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class);
> -
> -  public static void test(TableProvider provider) throws Exception{
> -    PTable<String> table = provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
> -    String[] keys = {"first", "second"};
> -    String[] values = {"value1", "value2"};
> -    Map<String, String> expectedMap = Maps.newHashMap();
> -
> -    for(int i =0; i < keys.length; i++){
> -      expectedMap.put(keys[i], values[i]);
> -      table.put(keys[i], values[i]);
> -    }
> -
> -    {
> -      Iterator<Map.Entry<String, String>> iter = table.iterator();
> -      for(int i =0; i < keys.length; i++){
> -        Entry<String, String> e = iter.next();
> -        assertTrue(expectedMap.containsKey(e.getKey()));
> -        assertEquals(expectedMap.get(e.getKey()), e.getValue());
> -      }
> -
> -      assertFalse(iter.hasNext());
> -    }
> -
> -    {
> -      Iterator<Map.Entry<String, String>> iter = table.iterator();
> -      while(iter.hasNext()){
> -        iter.next();
> -        iter.remove();
> -      }
> -    }
> -
> -    assertFalse(table.iterator().hasNext());
> -  }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> new file mode 100644
> index 0000000..18d87c7
> --- /dev/null
> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> @@ -0,0 +1,58 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.curator.framework.CuratorFrameworkFactory;
> +import org.apache.curator.retry.RetryNTimes;
> +import org.apache.drill.common.config.DrillConfig;
> +import org.apache.drill.exec.ExecConstants;
> +import org.apache.drill.exec.TestWithZookeeper;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
> +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
> +import org.junit.Test;
> +
> +public class TestPStoreProviders extends TestWithZookeeper {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
> +
> +  static LocalPStoreProvider provider;
> +
> +  @Test
> +  public void verifyLocalStore() throws Exception {
> +    try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){
> +      PStoreTestUtil.test(provider);
> +    }
> +  }
> +
> +  @Test
> +  public void verifyZkStore() throws Exception {
> +    DrillConfig config = getConfig();
> +    String connect = config.getString(ExecConstants.ZK_CONNECTION);
> +    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
> +    .namespace(config.getString(ExecConstants.ZK_ROOT))
> +    .retryPolicy(new RetryNTimes(1, 100))
> +    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
> +    .connectString(connect);
> +
> +    try(CuratorFramework curator = builder.build()){
> +      curator.start();
> +      ZkPStoreProvider provider = new ZkPStoreProvider(curator);
> +      PStoreTestUtil.test(provider);
> +    }
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> deleted file mode 100644
> index b7d92fe..0000000
> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> +++ /dev/null
> @@ -1,58 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.curator.framework.CuratorFrameworkFactory;
> -import org.apache.curator.retry.RetryNTimes;
> -import org.apache.drill.common.config.DrillConfig;
> -import org.apache.drill.exec.ExecConstants;
> -import org.apache.drill.exec.TestWithZookeeper;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> -import org.apache.drill.exec.store.sys.zk.ZkTableProvider;
> -import org.junit.Test;
> -
> -public class TestTableProviders extends TestWithZookeeper {
> -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTableProviders.class);
> -
> -  static LocalTableProvider provider;
> -
> -  @Test
> -  public void verifyLocalTable() throws Exception {
> -    try(LocalTableProvider provider = new LocalTableProvider(DrillConfig.create())){
> -      PTableTestUtil.test(provider);
> -    }
> -  }
> -
> -  @Test
> -  public void verifyZkTable() throws Exception {
> -    DrillConfig config = getConfig();
> -    String connect = config.getString(ExecConstants.ZK_CONNECTION);
> -    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
> -    .namespace(config.getString(ExecConstants.ZK_ROOT))
> -    .retryPolicy(new RetryNTimes(1, 100))
> -    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
> -    .connectString(connect);
> -
> -    try(CuratorFramework curator = builder.build()){
> -      curator.start();
> -      ZkTableProvider provider = new ZkTableProvider(curator);
> -      PTableTestUtil.test(provider);
> -    }
> -  }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml
> ----------------------------------------------------------------------
> diff --git a/pom.xml b/pom.xml
> index b2289f3..500f0fd 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -262,7 +262,7 @@
>            <artifactId>maven-surefire-plugin</artifactId>
>            <version>2.17</version>
>            <configuration>
> -            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
> +            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
>              <forkCount>4</forkCount>
>              <reuseForks>true</reuseForks>
>              <additionalClasspathElements>
>