You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/11 05:52:08 UTC
[23/61] [abbrv] Adding HBase Persistent Store.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
new file mode 100644
index 0000000..eb21c70
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.zk;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.zookeeper.CreateMode;
+
+import com.google.common.base.Preconditions;
+
+public class ZkPStore<V> implements PStore<V>{
+
+ private CuratorFramework framework;
+ private PStoreConfig<V> config;
+ private String prefix;
+ private String parent;
+
+ ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
+ this.parent = "/" + config.getName();
+ this.prefix = parent + "/";
+ this.framework = framework;
+ this.config = config;
+
+ // make sure the parent node exists.
+ try{
+ if(framework.checkExists().forPath(parent) == null) {
+ framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
+ }
+ }catch(Exception e){
+ throw new RuntimeException("Failure while accessing Zookeeper", e);
+ }
+
+ }
+
+ @Override
+ public Iterator<Entry<String, V>> iterator() {
+ try{
+ List<String> children = framework.getChildren().forPath(parent);
+ return new Iter(children.iterator());
+ }catch(Exception e){
+ throw new RuntimeException("Failure while accessing Zookeeper", e);
+ }
+
+ }
+
+ private String p(String key){
+ Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
+ return prefix + key;
+ }
+
+ @Override
+ public V get(String key) {
+ try{
+ byte[] bytes = framework.getData().forPath(p(key));
+ if(bytes == null){
+ return null;
+ }
+ return config.getSerializer().deserialize(bytes);
+
+ }catch(Exception e){
+ throw new RuntimeException("Failure while accessing Zookeeper", e);
+ }
+ }
+
+ @Override
+ public void put(String key, V value) {
+ try{
+ if(framework.checkExists().forPath(p(key)) != null) {
+ framework.setData().forPath(p(key), config.getSerializer().serialize(value));
+ }else{
+ framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
+ }
+
+ }catch(Exception e){
+ throw new RuntimeException("Failure while accessing Zookeeper", e);
+ }
+
+ }
+
+ @Override
+ public boolean putIfAbsent(String key, V value) {
+ try{
+ if(framework.checkExists().forPath(p(key)) != null) {
+ return false;
+ }else{
+ framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
+ return true;
+ }
+
+ }catch(Exception e){
+ throw new RuntimeException("Failure while accessing Zookeeper", e);
+ }
+ }
+
+ @Override
+ public void delete(String key) {
+ try{
+ framework.delete().forPath(p(key));
+ }catch(Exception e){
+ throw new RuntimeException("Failure while accessing Zookeeper", e);
+ }
+ }
+
+ private class Iter implements Iterator<Entry<String, V>>{
+
+ private Iterator<String> keys;
+ private String current;
+
+ public Iter(Iterator<String> keys) {
+ super();
+ this.keys = keys;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public Entry<String, V> next() {
+ current = keys.next();
+ return new DeferredEntry(current);
+ }
+
+ @Override
+ public void remove() {
+ delete(current);
+ keys.remove();
+ }
+
+ private class DeferredEntry implements Entry<String, V>{
+
+ private String name;
+
+ public DeferredEntry(String name) {
+ super();
+ this.name = name;
+ }
+
+ @Override
+ public String getKey() {
+ return name;
+ }
+
+ @Override
+ public V getValue() {
+ return get(name);
+ }
+
+ @Override
+ public V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
new file mode 100644
index 0000000..f4513c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.zk;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PStoreRegistry;
+
+public class ZkPStoreProvider implements PStoreProvider{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
+
+ private final CuratorFramework curator;
+
+ public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
+ ClusterCoordinator coord = registry.getClusterCoordinator();
+ if (!(coord instanceof ZKClusterCoordinator)) {
+ throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator.");
+ }
+ this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
+ }
+
+ public ZkPStoreProvider(CuratorFramework curator) {
+ this.curator = curator;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
+ return new ZkPStore<V>(curator, store);
+ }
+
+ @Override
+ public void start() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
deleted file mode 100644
index e2a6ecf..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.PTable;
-import org.apache.drill.exec.store.sys.PTableConfig;
-import org.apache.zookeeper.CreateMode;
-
-import com.google.common.base.Preconditions;
-
-public class ZkPTable<V> implements PTable<V>{
-
- private CuratorFramework framework;
- private PTableConfig<V> config;
- private String prefix;
- private String parent;
-
- ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws IOException {
- super();
- this.parent = "/" + config.getName();
- this.prefix = parent + "/";
- this.framework = framework;
- this.config = config;
-
- // make sure the parent node exists.
- try{
- if(framework.checkExists().forPath(parent) == null) {
- framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
- }
- }catch(Exception e){
- throw new RuntimeException("Failure while accessing Zookeeper", e);
- }
-
- }
-
- @Override
- public Iterator<Entry<String, V>> iterator() {
- try{
- List<String> children = framework.getChildren().forPath(parent);
- return new Iter(children.iterator());
- }catch(Exception e){
- throw new RuntimeException("Failure while accessing Zookeeper", e);
- }
-
- }
-
- private String p(String key){
- Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
- return prefix + key;
- }
-
- @Override
- public V get(String key) {
- try{
- byte[] bytes = framework.getData().forPath(p(key));
- if(bytes == null){
- return null;
- }
- return config.getSerializer().deserialize(bytes);
-
- }catch(Exception e){
- throw new RuntimeException("Failure while accessing Zookeeper", e);
- }
- }
-
- @Override
- public void put(String key, V value) {
- try{
- if(framework.checkExists().forPath(p(key)) != null) {
- framework.setData().forPath(p(key), config.getSerializer().serialize(value));
- }else{
- framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
- }
-
- }catch(Exception e){
- throw new RuntimeException("Failure while accessing Zookeeper", e);
- }
-
- }
-
- @Override
- public boolean putIfAbsent(String key, V value) {
- try{
- if(framework.checkExists().forPath(p(key)) != null) {
- return false;
- }else{
- framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
- return true;
- }
-
- }catch(Exception e){
- throw new RuntimeException("Failure while accessing Zookeeper", e);
- }
- }
-
- @Override
- public void delete(String key) {
- try{
- framework.delete().forPath(p(key));
- }catch(Exception e){
- throw new RuntimeException("Failure while accessing Zookeeper", e);
- }
- }
-
-
- private class Iter implements Iterator<Entry<String, V>>{
-
- private Iterator<String> keys;
- private String current;
-
- public Iter(Iterator<String> keys) {
- super();
- this.keys = keys;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public Entry<String, V> next() {
- current = keys.next();
- return new DeferredEntry(current);
- }
-
- @Override
- public void remove() {
- delete(current);
- keys.remove();
- }
-
-
- private class DeferredEntry implements Entry<String, V>{
-
- private String name;
-
-
- public DeferredEntry(String name) {
- super();
- this.name = name;
- }
-
- @Override
- public String getKey() {
- return name;
- }
-
- @Override
- public V getValue() {
- return get(name);
- }
-
- @Override
- public V setValue(V value) {
- throw new UnsupportedOperationException();
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
deleted file mode 100644
index 8d2e153..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.PTable;
-import org.apache.drill.exec.store.sys.PTableConfig;
-import org.apache.drill.exec.store.sys.TableProvider;
-
-public class ZkTableProvider implements TableProvider{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class);
-
- private final CuratorFramework curator;
-
- public ZkTableProvider(CuratorFramework curator){
- this.curator = curator;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException {
- return new ZkPTable<V>(curator, table);
- }
-
- @Override
- public void start() {
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 1eae8c5..71e4e8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler;
import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.sys.TableProvider;
+import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.batch.ControlHandlerImpl;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.foreman.Foreman;
@@ -87,7 +87,7 @@ public class WorkManager implements Closeable{
this.dataHandler = new DataResponseHandlerImpl(bee);
}
- public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, TableProvider provider){
+ public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){
this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider);
// executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
@@ -113,7 +113,9 @@ public class WorkManager implements Closeable{
@Override
public void close() throws IOException {
try {
- executor.awaitTermination(1, TimeUnit.SECONDS);
+ if (executor != null) {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ }
} catch (InterruptedException e) {
logger.warn("Executor interrupted while awaiting termination");
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6a7c9d5..9ce22c7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -94,7 +94,8 @@ drill.exec: {
affinity.factor: 1.2,
executor.threads: 4
},
- sys.tables: {
+ sys.store.provider: {
+ class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
local: {
path: "/tmp/drill",
write: true
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 547af34..ad114ab 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.local.LocalTableProvider;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
import org.junit.Rule;
import org.junit.rules.TestRule;
@@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{
final DistributedCache cache = new LocalCache();
cache.run();
- final LocalTableProvider provider = new LocalTableProvider(config);
+ final LocalPStoreProvider provider = new LocalPStoreProvider(config);
provider.start();
final SystemOptionManager opt = new SystemOptionManager(config, provider);
@@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{
result = opt;
dbContext.getCache();
result = cache;
- dbContext.getSystemTableProvider();
+ dbContext.getPersistentStoreProvider();
result = provider;
}
};
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 8fc37f3..199ecfc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.local.LocalTableProvider;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.junit.AfterClass;
@@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
}
};
RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
- DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalTableProvider(DrillConfig.create()));
+ DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
PhysicalPlanReader reader = bitContext.getPlanReader();
LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
new file mode 100644
index 0000000..6f7794b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+
+public class PStoreTestUtil {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
+
+ public static void test(PStoreProvider provider) throws Exception{
+ PStore<String> store = provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
+ String[] keys = {"first", "second"};
+ String[] values = {"value1", "value2"};
+ Map<String, String> expectedMap = Maps.newHashMap();
+
+ for(int i =0; i < keys.length; i++){
+ expectedMap.put(keys[i], values[i]);
+ store.put(keys[i], values[i]);
+ }
+
+ {
+ Iterator<Map.Entry<String, String>> iter = store.iterator();
+ for(int i =0; i < keys.length; i++){
+ Entry<String, String> e = iter.next();
+ assertTrue(expectedMap.containsKey(e.getKey()));
+ assertEquals(expectedMap.get(e.getKey()), e.getValue());
+ }
+
+ assertFalse(iter.hasNext());
+ }
+
+ {
+ Iterator<Map.Entry<String, String>> iter = store.iterator();
+ while(iter.hasNext()){
+ iter.next();
+ iter.remove();
+ }
+ }
+
+ assertFalse(store.iterator().hasNext());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
deleted file mode 100644
index 47a783b..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-
-public class PTableTestUtil {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class);
-
- public static void test(TableProvider provider) throws Exception{
- PTable<String> table = provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
- String[] keys = {"first", "second"};
- String[] values = {"value1", "value2"};
- Map<String, String> expectedMap = Maps.newHashMap();
-
- for(int i =0; i < keys.length; i++){
- expectedMap.put(keys[i], values[i]);
- table.put(keys[i], values[i]);
- }
-
- {
- Iterator<Map.Entry<String, String>> iter = table.iterator();
- for(int i =0; i < keys.length; i++){
- Entry<String, String> e = iter.next();
- assertTrue(expectedMap.containsKey(e.getKey()));
- assertEquals(expectedMap.get(e.getKey()), e.getValue());
- }
-
- assertFalse(iter.hasNext());
- }
-
- {
- Iterator<Map.Entry<String, String>> iter = table.iterator();
- while(iter.hasNext()){
- iter.next();
- iter.remove();
- }
- }
-
- assertFalse(table.iterator().hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
new file mode 100644
index 0000000..18d87c7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.TestWithZookeeper;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
+import org.junit.Test;
+
+public class TestPStoreProviders extends TestWithZookeeper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
+
+ static LocalPStoreProvider provider;
+
+ @Test
+ public void verifyLocalStore() throws Exception {
+ try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){
+ PStoreTestUtil.test(provider);
+ }
+ }
+
+ @Test
+ public void verifyZkStore() throws Exception {
+ DrillConfig config = getConfig();
+ String connect = config.getString(ExecConstants.ZK_CONNECTION);
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+ .namespace(config.getString(ExecConstants.ZK_ROOT))
+ .retryPolicy(new RetryNTimes(1, 100))
+ .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+ .connectString(connect);
+
+ try(CuratorFramework curator = builder.build()){
+ curator.start();
+ ZkPStoreProvider provider = new ZkPStoreProvider(curator);
+ PStoreTestUtil.test(provider);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
deleted file mode 100644
index b7d92fe..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.TestWithZookeeper;
-import org.apache.drill.exec.store.sys.local.LocalTableProvider;
-import org.apache.drill.exec.store.sys.zk.ZkTableProvider;
-import org.junit.Test;
-
-public class TestTableProviders extends TestWithZookeeper {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTableProviders.class);
-
- static LocalTableProvider provider;
-
- @Test
- public void verifyLocalTable() throws Exception {
- try(LocalTableProvider provider = new LocalTableProvider(DrillConfig.create())){
- PTableTestUtil.test(provider);
- }
- }
-
- @Test
- public void verifyZkTable() throws Exception {
- DrillConfig config = getConfig();
- String connect = config.getString(ExecConstants.ZK_CONNECTION);
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .namespace(config.getString(ExecConstants.ZK_ROOT))
- .retryPolicy(new RetryNTimes(1, 100))
- .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
- .connectString(connect);
-
- try(CuratorFramework curator = builder.build()){
- curator.start();
- ZkTableProvider provider = new ZkTableProvider(curator);
- PTableTestUtil.test(provider);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b2289f3..500f0fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,7 +262,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
- <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
+ <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
<forkCount>4</forkCount>
<reuseForks>true</reuseForks>
<additionalClasspathElements>
Re: [23/61] [abbrv] Adding HBase Persistent Store.
Posted by Timothy Chen <tn...@gmail.com>.
Never mind it's another commit.
Tim
On Tue, Jun 10, 2014 at 9:00 PM, Timothy Chen <tn...@gmail.com> wrote:
> Looks like we added ZK Pstore instead of HBase?
>
> Tim
>
> On Tue, Jun 10, 2014 at 8:52 PM, <ja...@apache.org> wrote:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
>> new file mode 100644
>> index 0000000..eb21c70
>> --- /dev/null
>> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
>> @@ -0,0 +1,180 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements. See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership. The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys.zk;
>> +
>> +import java.io.IOException;
>> +import java.util.Iterator;
>> +import java.util.List;
>> +import java.util.Map.Entry;
>> +
>> +import org.apache.curator.framework.CuratorFramework;
>> +import org.apache.drill.exec.store.sys.PStore;
>> +import org.apache.drill.exec.store.sys.PStoreConfig;
>> +import org.apache.zookeeper.CreateMode;
>> +
>> +import com.google.common.base.Preconditions;
>> +
>> +public class ZkPStore<V> implements PStore<V>{
>> +
>> + private CuratorFramework framework;
>> + private PStoreConfig<V> config;
>> + private String prefix;
>> + private String parent;
>> +
>> + ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
>> + this.parent = "/" + config.getName();
>> + this.prefix = parent + "/";
>> + this.framework = framework;
>> + this.config = config;
>> +
>> + // make sure the parent node exists.
>> + try{
>> + if(framework.checkExists().forPath(parent) == null) {
>> + framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
>> + }
>> + }catch(Exception e){
>> + throw new RuntimeException("Failure while accessing Zookeeper", e);
>> + }
>> +
>> + }
>> +
>> + @Override
>> + public Iterator<Entry<String, V>> iterator() {
>> + try{
>> + List<String> children = framework.getChildren().forPath(parent);
>> + return new Iter(children.iterator());
>> + }catch(Exception e){
>> + throw new RuntimeException("Failure while accessing Zookeeper", e);
>> + }
>> +
>> + }
>> +
>> + private String p(String key){
>> + Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
>> + return prefix + key;
>> + }
>> +
>> + @Override
>> + public V get(String key) {
>> + try{
>> + byte[] bytes = framework.getData().forPath(p(key));
>> + if(bytes == null){
>> + return null;
>> + }
>> + return config.getSerializer().deserialize(bytes);
>> +
>> + }catch(Exception e){
>> + throw new RuntimeException("Failure while accessing Zookeeper", e);
>> + }
>> + }
>> +
>> + @Override
>> + public void put(String key, V value) {
>> + try{
>> + if(framework.checkExists().forPath(p(key)) != null) {
>> + framework.setData().forPath(p(key), config.getSerializer().serialize(value));
>> + }else{
>> + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> + }
>> +
>> + }catch(Exception e){
>> + throw new RuntimeException("Failure while accessing Zookeeper", e);
>> + }
>> +
>> + }
>> +
>> + @Override
>> + public boolean putIfAbsent(String key, V value) {
>> + try{
>> + if(framework.checkExists().forPath(p(key)) != null) {
>> + return false;
>> + }else{
>> + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> + return true;
>> + }
>> +
>> + }catch(Exception e){
>> + throw new RuntimeException("Failure while accessing Zookeeper", e);
>> + }
>> + }
>> +
>> + @Override
>> + public void delete(String key) {
>> + try{
>> + framework.delete().forPath(p(key));
>> + }catch(Exception e){
>> + throw new RuntimeException("Failure while accessing Zookeeper", e);
>> + }
>> + }
>> +
>> + private class Iter implements Iterator<Entry<String, V>>{
>> +
>> + private Iterator<String> keys;
>> + private String current;
>> +
>> + public Iter(Iterator<String> keys) {
>> + super();
>> + this.keys = keys;
>> + }
>> +
>> + @Override
>> + public boolean hasNext() {
>> + return keys.hasNext();
>> + }
>> +
>> + @Override
>> + public Entry<String, V> next() {
>> + current = keys.next();
>> + return new DeferredEntry(current);
>> + }
>> +
>> + @Override
>> + public void remove() {
>> + delete(current);
>> + keys.remove();
>> + }
>> +
>> + private class DeferredEntry implements Entry<String, V>{
>> +
>> + private String name;
>> +
>> + public DeferredEntry(String name) {
>> + super();
>> + this.name = name;
>> + }
>> +
>> + @Override
>> + public String getKey() {
>> + return name;
>> + }
>> +
>> + @Override
>> + public V getValue() {
>> + return get(name);
>> + }
>> +
>> + @Override
>> + public V setValue(V value) {
>> + throw new UnsupportedOperationException();
>> + }
>> +
>> + }
>> +
>> + }
>> +
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
>> new file mode 100644
>> index 0000000..f4513c2
>> --- /dev/null
>> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
>> @@ -0,0 +1,61 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements. See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership. The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys.zk;
>> +
>> +import java.io.IOException;
>> +
>> +import org.apache.curator.framework.CuratorFramework;
>> +import org.apache.drill.exec.coord.ClusterCoordinator;
>> +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
>> +import org.apache.drill.exec.exception.DrillbitStartupException;
>> +import org.apache.drill.exec.store.sys.PStore;
>> +import org.apache.drill.exec.store.sys.PStoreConfig;
>> +import org.apache.drill.exec.store.sys.PStoreProvider;
>> +import org.apache.drill.exec.store.sys.PStoreRegistry;
>> +
>> +public class ZkPStoreProvider implements PStoreProvider{
>> + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
>> +
>> + private final CuratorFramework curator;
>> +
>> + public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
>> + ClusterCoordinator coord = registry.getClusterCoordinator();
>> + if (!(coord instanceof ZKClusterCoordinator)) {
>> + throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator.");
>> + }
>> + this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
>> + }
>> +
>> + public ZkPStoreProvider(CuratorFramework curator) {
>> + this.curator = curator;
>> + }
>> +
>> + @Override
>> + public void close() {
>> + }
>> +
>> + @Override
>> + public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
>> + return new ZkPStore<V>(curator, store);
>> + }
>> +
>> + @Override
>> + public void start() {
>> + }
>> +
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
>> deleted file mode 100644
>> index e2a6ecf..0000000
>> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
>> +++ /dev/null
>> @@ -1,182 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys.zk;
>> -
>> -import java.io.IOException;
>> -import java.util.Iterator;
>> -import java.util.List;
>> -import java.util.Map.Entry;
>> -
>> -import org.apache.curator.framework.CuratorFramework;
>> -import org.apache.drill.exec.store.sys.PTable;
>> -import org.apache.drill.exec.store.sys.PTableConfig;
>> -import org.apache.zookeeper.CreateMode;
>> -
>> -import com.google.common.base.Preconditions;
>> -
>> -public class ZkPTable<V> implements PTable<V>{
>> -
>> - private CuratorFramework framework;
>> - private PTableConfig<V> config;
>> - private String prefix;
>> - private String parent;
>> -
>> - ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws IOException {
>> - super();
>> - this.parent = "/" + config.getName();
>> - this.prefix = parent + "/";
>> - this.framework = framework;
>> - this.config = config;
>> -
>> - // make sure the parent node exists.
>> - try{
>> - if(framework.checkExists().forPath(parent) == null) {
>> - framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
>> - }
>> - }catch(Exception e){
>> - throw new RuntimeException("Failure while accessing Zookeeper", e);
>> - }
>> -
>> - }
>> -
>> - @Override
>> - public Iterator<Entry<String, V>> iterator() {
>> - try{
>> - List<String> children = framework.getChildren().forPath(parent);
>> - return new Iter(children.iterator());
>> - }catch(Exception e){
>> - throw new RuntimeException("Failure while accessing Zookeeper", e);
>> - }
>> -
>> - }
>> -
>> - private String p(String key){
>> - Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
>> - return prefix + key;
>> - }
>> -
>> - @Override
>> - public V get(String key) {
>> - try{
>> - byte[] bytes = framework.getData().forPath(p(key));
>> - if(bytes == null){
>> - return null;
>> - }
>> - return config.getSerializer().deserialize(bytes);
>> -
>> - }catch(Exception e){
>> - throw new RuntimeException("Failure while accessing Zookeeper", e);
>> - }
>> - }
>> -
>> - @Override
>> - public void put(String key, V value) {
>> - try{
>> - if(framework.checkExists().forPath(p(key)) != null) {
>> - framework.setData().forPath(p(key), config.getSerializer().serialize(value));
>> - }else{
>> - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> - }
>> -
>> - }catch(Exception e){
>> - throw new RuntimeException("Failure while accessing Zookeeper", e);
>> - }
>> -
>> - }
>> -
>> - @Override
>> - public boolean putIfAbsent(String key, V value) {
>> - try{
>> - if(framework.checkExists().forPath(p(key)) != null) {
>> - return false;
>> - }else{
>> - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
>> - return true;
>> - }
>> -
>> - }catch(Exception e){
>> - throw new RuntimeException("Failure while accessing Zookeeper", e);
>> - }
>> - }
>> -
>> - @Override
>> - public void delete(String key) {
>> - try{
>> - framework.delete().forPath(p(key));
>> - }catch(Exception e){
>> - throw new RuntimeException("Failure while accessing Zookeeper", e);
>> - }
>> - }
>> -
>> -
>> - private class Iter implements Iterator<Entry<String, V>>{
>> -
>> - private Iterator<String> keys;
>> - private String current;
>> -
>> - public Iter(Iterator<String> keys) {
>> - super();
>> - this.keys = keys;
>> - }
>> -
>> - @Override
>> - public boolean hasNext() {
>> - return keys.hasNext();
>> - }
>> -
>> - @Override
>> - public Entry<String, V> next() {
>> - current = keys.next();
>> - return new DeferredEntry(current);
>> - }
>> -
>> - @Override
>> - public void remove() {
>> - delete(current);
>> - keys.remove();
>> - }
>> -
>> -
>> - private class DeferredEntry implements Entry<String, V>{
>> -
>> - private String name;
>> -
>> -
>> - public DeferredEntry(String name) {
>> - super();
>> - this.name = name;
>> - }
>> -
>> - @Override
>> - public String getKey() {
>> - return name;
>> - }
>> -
>> - @Override
>> - public V getValue() {
>> - return get(name);
>> - }
>> -
>> - @Override
>> - public V setValue(V value) {
>> - throw new UnsupportedOperationException();
>> - }
>> -
>> - }
>> - }
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
>> deleted file mode 100644
>> index 8d2e153..0000000
>> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
>> +++ /dev/null
>> @@ -1,50 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys.zk;
>> -
>> -import java.io.IOException;
>> -
>> -import org.apache.curator.framework.CuratorFramework;
>> -import org.apache.drill.exec.store.sys.PTable;
>> -import org.apache.drill.exec.store.sys.PTableConfig;
>> -import org.apache.drill.exec.store.sys.TableProvider;
>> -
>> -public class ZkTableProvider implements TableProvider{
>> - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class);
>> -
>> - private final CuratorFramework curator;
>> -
>> - public ZkTableProvider(CuratorFramework curator){
>> - this.curator = curator;
>> - }
>> -
>> - @Override
>> - public void close() {
>> - }
>> -
>> - @Override
>> - public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException {
>> - return new ZkPTable<V>(curator, table);
>> - }
>> -
>> - @Override
>> - public void start() {
>> - }
>> -
>> -
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> index 1eae8c5..71e4e8e 100644
>> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
>> @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler;
>> import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
>> import org.apache.drill.exec.server.BootStrapContext;
>> import org.apache.drill.exec.server.DrillbitContext;
>> -import org.apache.drill.exec.store.sys.TableProvider;
>> +import org.apache.drill.exec.store.sys.PStoreProvider;
>> import org.apache.drill.exec.work.batch.ControlHandlerImpl;
>> import org.apache.drill.exec.work.batch.ControlMessageHandler;
>> import org.apache.drill.exec.work.foreman.Foreman;
>> @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{
>> this.dataHandler = new DataResponseHandlerImpl(bee);
>> }
>>
>> - public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, TableProvider provider){
>> + public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){
>> this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider);
>> // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
>> executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
>> @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{
>> @Override
>> public void close() throws IOException {
>> try {
>> - executor.awaitTermination(1, TimeUnit.SECONDS);
>> + if (executor != null) {
>> + executor.awaitTermination(1, TimeUnit.SECONDS);
>> + }
>> } catch (InterruptedException e) {
>> logger.warn("Executor interrupted while awaiting termination");
>> }
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
>> index 6a7c9d5..9ce22c7 100644
>> --- a/exec/java-exec/src/main/resources/drill-module.conf
>> +++ b/exec/java-exec/src/main/resources/drill-module.conf
>> @@ -94,7 +94,8 @@ drill.exec: {
>> affinity.factor: 1.2,
>> executor.threads: 4
>> },
>> - sys.tables: {
>> + sys.store.provider: {
>> + class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
>> local: {
>> path: "/tmp/drill",
>> write: true
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> index 547af34..ad114ab 100644
>> --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
>> @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager;
>> import org.apache.drill.exec.server.options.SessionOptionManager;
>> import org.apache.drill.exec.server.options.SystemOptionManager;
>> import org.apache.drill.exec.store.StoragePluginRegistry;
>> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
>> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>> import org.junit.Rule;
>> import org.junit.rules.TestRule;
>>
>> @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{
>> final DistributedCache cache = new LocalCache();
>> cache.run();
>>
>> - final LocalTableProvider provider = new LocalTableProvider(config);
>> + final LocalPStoreProvider provider = new LocalPStoreProvider(config);
>> provider.start();
>>
>> final SystemOptionManager opt = new SystemOptionManager(config, provider);
>> @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{
>> result = opt;
>> dbContext.getCache();
>> result = cache;
>> - dbContext.getSystemTableProvider();
>> + dbContext.getPersistentStoreProvider();
>> result = provider;
>> }
>> };
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> index 8fc37f3..199ecfc 100644
>> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>> @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit;
>> import org.apache.drill.exec.server.DrillbitContext;
>> import org.apache.drill.exec.server.RemoteServiceSet;
>> import org.apache.drill.exec.store.StoragePluginRegistry;
>> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
>> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>> import org.apache.drill.exec.vector.ValueVector;
>> import org.apache.drill.exec.vector.VarBinaryVector;
>> import org.junit.AfterClass;
>> @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
>> }
>> };
>> RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
>> - DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalTableProvider(DrillConfig.create()));
>> + DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
>> QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
>> PhysicalPlanReader reader = bitContext.getPlanReader();
>> LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
>> new file mode 100644
>> index 0000000..6f7794b
>> --- /dev/null
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
>> @@ -0,0 +1,66 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements. See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership. The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys;
>> +
>> +import static org.junit.Assert.assertEquals;
>> +import static org.junit.Assert.assertFalse;
>> +import static org.junit.Assert.assertTrue;
>> +
>> +import java.util.Iterator;
>> +import java.util.Map;
>> +import java.util.Map.Entry;
>> +
>> +import com.fasterxml.jackson.databind.ObjectMapper;
>> +import com.google.common.collect.Maps;
>> +
>> +public class PStoreTestUtil {
>> + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
>> +
>> + public static void test(PStoreProvider provider) throws Exception{
>> + PStore<String> store = provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
>> + String[] keys = {"first", "second"};
>> + String[] values = {"value1", "value2"};
>> + Map<String, String> expectedMap = Maps.newHashMap();
>> +
>> + for(int i =0; i < keys.length; i++){
>> + expectedMap.put(keys[i], values[i]);
>> + store.put(keys[i], values[i]);
>> + }
>> +
>> + {
>> + Iterator<Map.Entry<String, String>> iter = store.iterator();
>> + for(int i =0; i < keys.length; i++){
>> + Entry<String, String> e = iter.next();
>> + assertTrue(expectedMap.containsKey(e.getKey()));
>> + assertEquals(expectedMap.get(e.getKey()), e.getValue());
>> + }
>> +
>> + assertFalse(iter.hasNext());
>> + }
>> +
>> + {
>> + Iterator<Map.Entry<String, String>> iter = store.iterator();
>> + while(iter.hasNext()){
>> + iter.next();
>> + iter.remove();
>> + }
>> + }
>> +
>> + assertFalse(store.iterator().hasNext());
>> + }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
>> deleted file mode 100644
>> index 47a783b..0000000
>> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
>> +++ /dev/null
>> @@ -1,66 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys;
>> -
>> -import static org.junit.Assert.assertEquals;
>> -import static org.junit.Assert.assertFalse;
>> -import static org.junit.Assert.assertTrue;
>> -
>> -import java.util.Iterator;
>> -import java.util.Map;
>> -import java.util.Map.Entry;
>> -
>> -import com.fasterxml.jackson.databind.ObjectMapper;
>> -import com.google.common.collect.Maps;
>> -
>> -public class PTableTestUtil {
>> - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class);
>> -
>> - public static void test(TableProvider provider) throws Exception{
>> - PTable<String> table = provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
>> - String[] keys = {"first", "second"};
>> - String[] values = {"value1", "value2"};
>> - Map<String, String> expectedMap = Maps.newHashMap();
>> -
>> - for(int i =0; i < keys.length; i++){
>> - expectedMap.put(keys[i], values[i]);
>> - table.put(keys[i], values[i]);
>> - }
>> -
>> - {
>> - Iterator<Map.Entry<String, String>> iter = table.iterator();
>> - for(int i =0; i < keys.length; i++){
>> - Entry<String, String> e = iter.next();
>> - assertTrue(expectedMap.containsKey(e.getKey()));
>> - assertEquals(expectedMap.get(e.getKey()), e.getValue());
>> - }
>> -
>> - assertFalse(iter.hasNext());
>> - }
>> -
>> - {
>> - Iterator<Map.Entry<String, String>> iter = table.iterator();
>> - while(iter.hasNext()){
>> - iter.next();
>> - iter.remove();
>> - }
>> - }
>> -
>> - assertFalse(table.iterator().hasNext());
>> - }
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
>> new file mode 100644
>> index 0000000..18d87c7
>> --- /dev/null
>> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
>> @@ -0,0 +1,58 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements. See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership. The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.drill.exec.store.sys;
>> +
>> +import org.apache.curator.framework.CuratorFramework;
>> +import org.apache.curator.framework.CuratorFrameworkFactory;
>> +import org.apache.curator.retry.RetryNTimes;
>> +import org.apache.drill.common.config.DrillConfig;
>> +import org.apache.drill.exec.ExecConstants;
>> +import org.apache.drill.exec.TestWithZookeeper;
>> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>> +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
>> +import org.junit.Test;
>> +
>> +public class TestPStoreProviders extends TestWithZookeeper {
>> + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
>> +
>> + static LocalPStoreProvider provider;
>> +
>> + @Test
>> + public void verifyLocalStore() throws Exception {
>> + try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){
>> + PStoreTestUtil.test(provider);
>> + }
>> + }
>> +
>> + @Test
>> + public void verifyZkStore() throws Exception {
>> + DrillConfig config = getConfig();
>> + String connect = config.getString(ExecConstants.ZK_CONNECTION);
>> + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
>> + .namespace(config.getString(ExecConstants.ZK_ROOT))
>> + .retryPolicy(new RetryNTimes(1, 100))
>> + .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
>> + .connectString(connect);
>> +
>> + try(CuratorFramework curator = builder.build()){
>> + curator.start();
>> + ZkPStoreProvider provider = new ZkPStoreProvider(curator);
>> + PStoreTestUtil.test(provider);
>> + }
>> + }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
>> ----------------------------------------------------------------------
>> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
>> deleted file mode 100644
>> index b7d92fe..0000000
>> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
>> +++ /dev/null
>> @@ -1,58 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.drill.exec.store.sys;
>> -
>> -import org.apache.curator.framework.CuratorFramework;
>> -import org.apache.curator.framework.CuratorFrameworkFactory;
>> -import org.apache.curator.retry.RetryNTimes;
>> -import org.apache.drill.common.config.DrillConfig;
>> -import org.apache.drill.exec.ExecConstants;
>> -import org.apache.drill.exec.TestWithZookeeper;
>> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
>> -import org.apache.drill.exec.store.sys.zk.ZkTableProvider;
>> -import org.junit.Test;
>> -
>> -public class TestTableProviders extends TestWithZookeeper {
>> - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTableProviders.class);
>> -
>> - static LocalTableProvider provider;
>> -
>> - @Test
>> - public void verifyLocalTable() throws Exception {
>> - try(LocalTableProvider provider = new LocalTableProvider(DrillConfig.create())){
>> - PTableTestUtil.test(provider);
>> - }
>> - }
>> -
>> - @Test
>> - public void verifyZkTable() throws Exception {
>> - DrillConfig config = getConfig();
>> - String connect = config.getString(ExecConstants.ZK_CONNECTION);
>> - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
>> - .namespace(config.getString(ExecConstants.ZK_ROOT))
>> - .retryPolicy(new RetryNTimes(1, 100))
>> - .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
>> - .connectString(connect);
>> -
>> - try(CuratorFramework curator = builder.build()){
>> - curator.start();
>> - ZkTableProvider provider = new ZkTableProvider(curator);
>> - PTableTestUtil.test(provider);
>> - }
>> - }
>> -}
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml
>> ----------------------------------------------------------------------
>> diff --git a/pom.xml b/pom.xml
>> index b2289f3..500f0fd 100644
>> --- a/pom.xml
>> +++ b/pom.xml
>> @@ -262,7 +262,7 @@
>> <artifactId>maven-surefire-plugin</artifactId>
>> <version>2.17</version>
>> <configuration>
>> - <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
>> + <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
>> <forkCount>4</forkCount>
>> <reuseForks>true</reuseForks>
>> <additionalClasspathElements>
>>
Re: [23/61] [abbrv] Adding HBase Persistent Store.
Posted by Timothy Chen <tn...@gmail.com>.
Looks like we added ZK Pstore instead of HBase?
Tim
On Tue, Jun 10, 2014 at 8:52 PM, <ja...@apache.org> wrote:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> new file mode 100644
> index 0000000..eb21c70
> --- /dev/null
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> @@ -0,0 +1,180 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys.zk;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map.Entry;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.drill.exec.store.sys.PStore;
> +import org.apache.drill.exec.store.sys.PStoreConfig;
> +import org.apache.zookeeper.CreateMode;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class ZkPStore<V> implements PStore<V>{
> +
> + private CuratorFramework framework;
> + private PStoreConfig<V> config;
> + private String prefix;
> + private String parent;
> +
> + ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
> + this.parent = "/" + config.getName();
> + this.prefix = parent + "/";
> + this.framework = framework;
> + this.config = config;
> +
> + // make sure the parent node exists.
> + try{
> + if(framework.checkExists().forPath(parent) == null) {
> + framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
> + }
> + }catch(Exception e){
> + throw new RuntimeException("Failure while accessing Zookeeper", e);
> + }
> +
> + }
> +
> + @Override
> + public Iterator<Entry<String, V>> iterator() {
> + try{
> + List<String> children = framework.getChildren().forPath(parent);
> + return new Iter(children.iterator());
> + }catch(Exception e){
> + throw new RuntimeException("Failure while accessing Zookeeper", e);
> + }
> +
> + }
> +
> + private String p(String key){
> + Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
> + return prefix + key;
> + }
> +
> + @Override
> + public V get(String key) {
> + try{
> + byte[] bytes = framework.getData().forPath(p(key));
> + if(bytes == null){
> + return null;
> + }
> + return config.getSerializer().deserialize(bytes);
> +
> + }catch(Exception e){
> + throw new RuntimeException("Failure while accessing Zookeeper", e);
> + }
> + }
> +
> + @Override
> + public void put(String key, V value) {
> + try{
> + if(framework.checkExists().forPath(p(key)) != null) {
> + framework.setData().forPath(p(key), config.getSerializer().serialize(value));
> + }else{
> + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> + }
> +
> + }catch(Exception e){
> + throw new RuntimeException("Failure while accessing Zookeeper", e);
> + }
> +
> + }
> +
> + @Override
> + public boolean putIfAbsent(String key, V value) {
> + try{
> + if(framework.checkExists().forPath(p(key)) != null) {
> + return false;
> + }else{
> + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> + return true;
> + }
> +
> + }catch(Exception e){
> + throw new RuntimeException("Failure while accessing Zookeeper", e);
> + }
> + }
> +
> + @Override
> + public void delete(String key) {
> + try{
> + framework.delete().forPath(p(key));
> + }catch(Exception e){
> + throw new RuntimeException("Failure while accessing Zookeeper", e);
> + }
> + }
> +
> + private class Iter implements Iterator<Entry<String, V>>{
> +
> + private Iterator<String> keys;
> + private String current;
> +
> + public Iter(Iterator<String> keys) {
> + super();
> + this.keys = keys;
> + }
> +
> + @Override
> + public boolean hasNext() {
> + return keys.hasNext();
> + }
> +
> + @Override
> + public Entry<String, V> next() {
> + current = keys.next();
> + return new DeferredEntry(current);
> + }
> +
> + @Override
> + public void remove() {
> + delete(current);
> + keys.remove();
> + }
> +
> + private class DeferredEntry implements Entry<String, V>{
> +
> + private String name;
> +
> + public DeferredEntry(String name) {
> + super();
> + this.name = name;
> + }
> +
> + @Override
> + public String getKey() {
> + return name;
> + }
> +
> + @Override
> + public V getValue() {
> + return get(name);
> + }
> +
> + @Override
> + public V setValue(V value) {
> + throw new UnsupportedOperationException();
> + }
> +
> + }
> +
> + }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> new file mode 100644
> index 0000000..f4513c2
> --- /dev/null
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> @@ -0,0 +1,61 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys.zk;
> +
> +import java.io.IOException;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.drill.exec.coord.ClusterCoordinator;
> +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
> +import org.apache.drill.exec.exception.DrillbitStartupException;
> +import org.apache.drill.exec.store.sys.PStore;
> +import org.apache.drill.exec.store.sys.PStoreConfig;
> +import org.apache.drill.exec.store.sys.PStoreProvider;
> +import org.apache.drill.exec.store.sys.PStoreRegistry;
> +
> +public class ZkPStoreProvider implements PStoreProvider{
> + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
> +
> + private final CuratorFramework curator;
> +
> + public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
> + ClusterCoordinator coord = registry.getClusterCoordinator();
> + if (!(coord instanceof ZKClusterCoordinator)) {
> + throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator.");
> + }
> + this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
> + }
> +
> + public ZkPStoreProvider(CuratorFramework curator) {
> + this.curator = curator;
> + }
> +
> + @Override
> + public void close() {
> + }
> +
> + @Override
> + public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
> + return new ZkPStore<V>(curator, store);
> + }
> +
> + @Override
> + public void start() {
> + }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> deleted file mode 100644
> index e2a6ecf..0000000
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> +++ /dev/null
> @@ -1,182 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys.zk;
> -
> -import java.io.IOException;
> -import java.util.Iterator;
> -import java.util.List;
> -import java.util.Map.Entry;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.drill.exec.store.sys.PTable;
> -import org.apache.drill.exec.store.sys.PTableConfig;
> -import org.apache.zookeeper.CreateMode;
> -
> -import com.google.common.base.Preconditions;
> -
> -public class ZkPTable<V> implements PTable<V>{
> -
> - private CuratorFramework framework;
> - private PTableConfig<V> config;
> - private String prefix;
> - private String parent;
> -
> - ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws IOException {
> - super();
> - this.parent = "/" + config.getName();
> - this.prefix = parent + "/";
> - this.framework = framework;
> - this.config = config;
> -
> - // make sure the parent node exists.
> - try{
> - if(framework.checkExists().forPath(parent) == null) {
> - framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
> - }
> - }catch(Exception e){
> - throw new RuntimeException("Failure while accessing Zookeeper", e);
> - }
> -
> - }
> -
> - @Override
> - public Iterator<Entry<String, V>> iterator() {
> - try{
> - List<String> children = framework.getChildren().forPath(parent);
> - return new Iter(children.iterator());
> - }catch(Exception e){
> - throw new RuntimeException("Failure while accessing Zookeeper", e);
> - }
> -
> - }
> -
> - private String p(String key){
> - Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
> - return prefix + key;
> - }
> -
> - @Override
> - public V get(String key) {
> - try{
> - byte[] bytes = framework.getData().forPath(p(key));
> - if(bytes == null){
> - return null;
> - }
> - return config.getSerializer().deserialize(bytes);
> -
> - }catch(Exception e){
> - throw new RuntimeException("Failure while accessing Zookeeper", e);
> - }
> - }
> -
> - @Override
> - public void put(String key, V value) {
> - try{
> - if(framework.checkExists().forPath(p(key)) != null) {
> - framework.setData().forPath(p(key), config.getSerializer().serialize(value));
> - }else{
> - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> - }
> -
> - }catch(Exception e){
> - throw new RuntimeException("Failure while accessing Zookeeper", e);
> - }
> -
> - }
> -
> - @Override
> - public boolean putIfAbsent(String key, V value) {
> - try{
> - if(framework.checkExists().forPath(p(key)) != null) {
> - return false;
> - }else{
> - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
> - return true;
> - }
> -
> - }catch(Exception e){
> - throw new RuntimeException("Failure while accessing Zookeeper", e);
> - }
> - }
> -
> - @Override
> - public void delete(String key) {
> - try{
> - framework.delete().forPath(p(key));
> - }catch(Exception e){
> - throw new RuntimeException("Failure while accessing Zookeeper", e);
> - }
> - }
> -
> -
> - private class Iter implements Iterator<Entry<String, V>>{
> -
> - private Iterator<String> keys;
> - private String current;
> -
> - public Iter(Iterator<String> keys) {
> - super();
> - this.keys = keys;
> - }
> -
> - @Override
> - public boolean hasNext() {
> - return keys.hasNext();
> - }
> -
> - @Override
> - public Entry<String, V> next() {
> - current = keys.next();
> - return new DeferredEntry(current);
> - }
> -
> - @Override
> - public void remove() {
> - delete(current);
> - keys.remove();
> - }
> -
> -
> - private class DeferredEntry implements Entry<String, V>{
> -
> - private String name;
> -
> -
> - public DeferredEntry(String name) {
> - super();
> - this.name = name;
> - }
> -
> - @Override
> - public String getKey() {
> - return name;
> - }
> -
> - @Override
> - public V getValue() {
> - return get(name);
> - }
> -
> - @Override
> - public V setValue(V value) {
> - throw new UnsupportedOperationException();
> - }
> -
> - }
> - }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> deleted file mode 100644
> index 8d2e153..0000000
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> +++ /dev/null
> @@ -1,50 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys.zk;
> -
> -import java.io.IOException;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.drill.exec.store.sys.PTable;
> -import org.apache.drill.exec.store.sys.PTableConfig;
> -import org.apache.drill.exec.store.sys.TableProvider;
> -
> -public class ZkTableProvider implements TableProvider{
> - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class);
> -
> - private final CuratorFramework curator;
> -
> - public ZkTableProvider(CuratorFramework curator){
> - this.curator = curator;
> - }
> -
> - @Override
> - public void close() {
> - }
> -
> - @Override
> - public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException {
> - return new ZkPTable<V>(curator, table);
> - }
> -
> - @Override
> - public void start() {
> - }
> -
> -
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> index 1eae8c5..71e4e8e 100644
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler;
> import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
> import org.apache.drill.exec.server.BootStrapContext;
> import org.apache.drill.exec.server.DrillbitContext;
> -import org.apache.drill.exec.store.sys.TableProvider;
> +import org.apache.drill.exec.store.sys.PStoreProvider;
> import org.apache.drill.exec.work.batch.ControlHandlerImpl;
> import org.apache.drill.exec.work.batch.ControlMessageHandler;
> import org.apache.drill.exec.work.foreman.Foreman;
> @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{
> this.dataHandler = new DataResponseHandlerImpl(bee);
> }
>
> - public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, TableProvider provider){
> + public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){
> this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider);
> // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
> executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
> @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{
> @Override
> public void close() throws IOException {
> try {
> - executor.awaitTermination(1, TimeUnit.SECONDS);
> + if (executor != null) {
> + executor.awaitTermination(1, TimeUnit.SECONDS);
> + }
> } catch (InterruptedException e) {
> logger.warn("Executor interrupted while awaiting termination");
> }
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
> index 6a7c9d5..9ce22c7 100644
> --- a/exec/java-exec/src/main/resources/drill-module.conf
> +++ b/exec/java-exec/src/main/resources/drill-module.conf
> @@ -94,7 +94,8 @@ drill.exec: {
> affinity.factor: 1.2,
> executor.threads: 4
> },
> - sys.tables: {
> + sys.store.provider: {
> + class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
> local: {
> path: "/tmp/drill",
> write: true
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> index 547af34..ad114ab 100644
> --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager;
> import org.apache.drill.exec.server.options.SessionOptionManager;
> import org.apache.drill.exec.server.options.SystemOptionManager;
> import org.apache.drill.exec.store.StoragePluginRegistry;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
> import org.junit.Rule;
> import org.junit.rules.TestRule;
>
> @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{
> final DistributedCache cache = new LocalCache();
> cache.run();
>
> - final LocalTableProvider provider = new LocalTableProvider(config);
> + final LocalPStoreProvider provider = new LocalPStoreProvider(config);
> provider.start();
>
> final SystemOptionManager opt = new SystemOptionManager(config, provider);
> @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{
> result = opt;
> dbContext.getCache();
> result = cache;
> - dbContext.getSystemTableProvider();
> + dbContext.getPersistentStoreProvider();
> result = provider;
> }
> };
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> index 8fc37f3..199ecfc 100644
> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit;
> import org.apache.drill.exec.server.DrillbitContext;
> import org.apache.drill.exec.server.RemoteServiceSet;
> import org.apache.drill.exec.store.StoragePluginRegistry;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
> import org.apache.drill.exec.vector.ValueVector;
> import org.apache.drill.exec.vector.VarBinaryVector;
> import org.junit.AfterClass;
> @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
> }
> };
> RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
> - DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalTableProvider(DrillConfig.create()));
> + DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
> QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext);
> PhysicalPlanReader reader = bitContext.getPlanReader();
> LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> new file mode 100644
> index 0000000..6f7794b
> --- /dev/null
> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> @@ -0,0 +1,66 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys;
> +
> +import static org.junit.Assert.assertEquals;
> +import static org.junit.Assert.assertFalse;
> +import static org.junit.Assert.assertTrue;
> +
> +import java.util.Iterator;
> +import java.util.Map;
> +import java.util.Map.Entry;
> +
> +import com.fasterxml.jackson.databind.ObjectMapper;
> +import com.google.common.collect.Maps;
> +
> +public class PStoreTestUtil {
> + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
> +
> + public static void test(PStoreProvider provider) throws Exception{
> + PStore<String> store = provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
> + String[] keys = {"first", "second"};
> + String[] values = {"value1", "value2"};
> + Map<String, String> expectedMap = Maps.newHashMap();
> +
> + for(int i =0; i < keys.length; i++){
> + expectedMap.put(keys[i], values[i]);
> + store.put(keys[i], values[i]);
> + }
> +
> + {
> + Iterator<Map.Entry<String, String>> iter = store.iterator();
> + for(int i =0; i < keys.length; i++){
> + Entry<String, String> e = iter.next();
> + assertTrue(expectedMap.containsKey(e.getKey()));
> + assertEquals(expectedMap.get(e.getKey()), e.getValue());
> + }
> +
> + assertFalse(iter.hasNext());
> + }
> +
> + {
> + Iterator<Map.Entry<String, String>> iter = store.iterator();
> + while(iter.hasNext()){
> + iter.next();
> + iter.remove();
> + }
> + }
> +
> + assertFalse(store.iterator().hasNext());
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> deleted file mode 100644
> index 47a783b..0000000
> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> +++ /dev/null
> @@ -1,66 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys;
> -
> -import static org.junit.Assert.assertEquals;
> -import static org.junit.Assert.assertFalse;
> -import static org.junit.Assert.assertTrue;
> -
> -import java.util.Iterator;
> -import java.util.Map;
> -import java.util.Map.Entry;
> -
> -import com.fasterxml.jackson.databind.ObjectMapper;
> -import com.google.common.collect.Maps;
> -
> -public class PTableTestUtil {
> - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class);
> -
> - public static void test(TableProvider provider) throws Exception{
> - PTable<String> table = provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
> - String[] keys = {"first", "second"};
> - String[] values = {"value1", "value2"};
> - Map<String, String> expectedMap = Maps.newHashMap();
> -
> - for(int i =0; i < keys.length; i++){
> - expectedMap.put(keys[i], values[i]);
> - table.put(keys[i], values[i]);
> - }
> -
> - {
> - Iterator<Map.Entry<String, String>> iter = table.iterator();
> - for(int i =0; i < keys.length; i++){
> - Entry<String, String> e = iter.next();
> - assertTrue(expectedMap.containsKey(e.getKey()));
> - assertEquals(expectedMap.get(e.getKey()), e.getValue());
> - }
> -
> - assertFalse(iter.hasNext());
> - }
> -
> - {
> - Iterator<Map.Entry<String, String>> iter = table.iterator();
> - while(iter.hasNext()){
> - iter.next();
> - iter.remove();
> - }
> - }
> -
> - assertFalse(table.iterator().hasNext());
> - }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> new file mode 100644
> index 0000000..18d87c7
> --- /dev/null
> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> @@ -0,0 +1,58 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.curator.framework.CuratorFrameworkFactory;
> +import org.apache.curator.retry.RetryNTimes;
> +import org.apache.drill.common.config.DrillConfig;
> +import org.apache.drill.exec.ExecConstants;
> +import org.apache.drill.exec.TestWithZookeeper;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
> +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
> +import org.junit.Test;
> +
> +public class TestPStoreProviders extends TestWithZookeeper {
> + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
> +
> + static LocalPStoreProvider provider;
> +
> + @Test
> + public void verifyLocalStore() throws Exception {
> + try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){
> + PStoreTestUtil.test(provider);
> + }
> + }
> +
> + @Test
> + public void verifyZkStore() throws Exception {
> + DrillConfig config = getConfig();
> + String connect = config.getString(ExecConstants.ZK_CONNECTION);
> + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
> + .namespace(config.getString(ExecConstants.ZK_ROOT))
> + .retryPolicy(new RetryNTimes(1, 100))
> + .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
> + .connectString(connect);
> +
> + try(CuratorFramework curator = builder.build()){
> + curator.start();
> + ZkPStoreProvider provider = new ZkPStoreProvider(curator);
> + PStoreTestUtil.test(provider);
> + }
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> deleted file mode 100644
> index b7d92fe..0000000
> --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> +++ /dev/null
> @@ -1,58 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.curator.framework.CuratorFrameworkFactory;
> -import org.apache.curator.retry.RetryNTimes;
> -import org.apache.drill.common.config.DrillConfig;
> -import org.apache.drill.exec.ExecConstants;
> -import org.apache.drill.exec.TestWithZookeeper;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> -import org.apache.drill.exec.store.sys.zk.ZkTableProvider;
> -import org.junit.Test;
> -
> -public class TestTableProviders extends TestWithZookeeper {
> - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTableProviders.class);
> -
> - static LocalTableProvider provider;
> -
> - @Test
> - public void verifyLocalTable() throws Exception {
> - try(LocalTableProvider provider = new LocalTableProvider(DrillConfig.create())){
> - PTableTestUtil.test(provider);
> - }
> - }
> -
> - @Test
> - public void verifyZkTable() throws Exception {
> - DrillConfig config = getConfig();
> - String connect = config.getString(ExecConstants.ZK_CONNECTION);
> - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
> - .namespace(config.getString(ExecConstants.ZK_ROOT))
> - .retryPolicy(new RetryNTimes(1, 100))
> - .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
> - .connectString(connect);
> -
> - try(CuratorFramework curator = builder.build()){
> - curator.start();
> - ZkTableProvider provider = new ZkTableProvider(curator);
> - PTableTestUtil.test(provider);
> - }
> - }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml
> ----------------------------------------------------------------------
> diff --git a/pom.xml b/pom.xml
> index b2289f3..500f0fd 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -262,7 +262,7 @@
> <artifactId>maven-surefire-plugin</artifactId>
> <version>2.17</version>
> <configuration>
> - <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
> + <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
> <forkCount>4</forkCount>
> <reuseForks>true</reuseForks>
> <additionalClasspathElements>
>