You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/11/20 17:44:38 UTC
[02/12] incubator-drill git commit: DRILL-1684, DRILL-1517,
DRILL-1350: Profile and cancellation updates - Remove any storage of
persisted profiles. - Store a separate query info object for active queries.
- Update cancellation and running profile loadin
DRILL-1684, DRILL-1517, DRILL-1350: Profile and cancellation updates
- Remove any storage of persisted profiles.
- Store a separate query info object for active queries.
- Update cancellation and running profile loading to query foreman server.
- Make file store support HDFS APIs
- Update PStoreProvider to use configuration to decide if you want PERSISTENT, EPHEMERAL, or BLOB storage rather than separate interfaces.
- Update ZkPStore's persistent mode to leverage a cache and respond to changes rather than actively probing values.
- Update ZkPStore's cache to be effectively write-through.
- Automatically delete deprecated or default value options from PStore.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2eb72a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2eb72a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2eb72a7c
Branch: refs/heads/master
Commit: 2eb72a7c238fe960b61edbae0d3dea1c836c746f
Parents: 90c12c8
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Nov 9 15:16:52 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Nov 19 21:35:39 2014 -0800
----------------------------------------------------------------------
.../exec/store/hbase/config/HBasePStore.java | 17 +-
.../store/hbase/config/HBasePStoreProvider.java | 45 +-
.../drill/hbase/TestHBaseTableProvider.java | 4 +-
.../exec/store/mongo/config/MongoPStore.java | 10 -
.../store/mongo/config/MongoPStoreProvider.java | 16 +-
.../exec/rpc/control/ControlRpcConfig.java | 1 +
.../drill/exec/rpc/control/ControlTunnel.java | 20 +
.../drill/exec/rpc/data/DataRpcConfig.java | 1 +
.../server/options/FallbackOptionManager.java | 97 ++
.../server/options/FragmentOptionManager.java | 50 +-
.../server/options/InMemoryOptionManager.java | 50 +
.../exec/server/options/QueryOptionManager.java | 72 +-
.../server/options/SessionOptionManager.java | 71 +-
.../server/options/SystemOptionManager.java | 79 +-
.../drill/exec/store/StoragePluginRegistry.java | 2 +-
.../exec/store/dfs/WorkspaceSchemaFactory.java | 3 +-
.../org/apache/drill/exec/store/sys/EStore.java | 6 +-
.../drill/exec/store/sys/EStoreProvider.java | 2 +-
.../org/apache/drill/exec/store/sys/PStore.java | 6 +-
.../drill/exec/store/sys/PStoreConfig.java | 47 +-
.../drill/exec/store/sys/PStoreProvider.java | 4 +-
.../drill/exec/store/sys/local/FilePStore.java | 231 ++++
.../store/sys/local/LocalEStoreProvider.java | 13 +-
.../drill/exec/store/sys/local/LocalPStore.java | 208 ----
.../store/sys/local/LocalPStoreProvider.java | 47 +-
.../drill/exec/store/sys/local/MapEStore.java | 6 +
.../store/sys/local/NoWriteLocalPStore.java | 10 -
.../exec/store/sys/zk/ZkAbstractStore.java | 89 +-
.../exec/store/sys/zk/ZkEStoreProvider.java | 6 +-
.../drill/exec/store/sys/zk/ZkPStore.java | 56 +-
.../exec/store/sys/zk/ZkPStoreProvider.java | 61 +-
.../exec/work/batch/ControlHandlerImpl.java | 18 +-
.../apache/drill/exec/work/foreman/Foreman.java | 2 +-
.../drill/exec/work/foreman/QueryStatus.java | 73 +-
.../apache/drill/exec/work/user/UserWorker.java | 10 +-
.../drill/exec/store/sys/PStoreTestUtil.java | 6 +-
.../exec/store/sys/TestPStoreProviders.java | 2 +-
.../org/apache/drill/exec/proto/BitControl.java | 29 +-
.../drill/exec/proto/SchemaUserBitShared.java | 141 +++
.../apache/drill/exec/proto/UserBitShared.java | 1125 +++++++++++++++++-
.../drill/exec/proto/beans/QueryInfo.java | 253 ++++
protocol/src/main/protobuf/BitControl.proto | 1 +
protocol/src/main/protobuf/UserBitShared.proto | 7 +
43 files changed, 2257 insertions(+), 740 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
index b5a697c..59a3125 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.hbase.config;
import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY;
-import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY_BLOB;
import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.QUALIFIER;
import java.io.IOException;
@@ -63,16 +62,15 @@ public class HBasePStore<V> implements PStore<V> {
return get(key, FAMILY);
}
- @Override
- public V getBlob(String key) {
- return get(key, FAMILY_BLOB);
- }
-
protected synchronized V get(String key, byte[] family) {
try {
Get get = new Get(row(key));
get.addColumn(family, QUALIFIER);
- return value(table.get(get));
+ Result r = table.get(get);
+ if(r.isEmpty()){
+ return null;
+ }
+ return value(r);
} catch (IOException e) {
throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e);
}
@@ -83,11 +81,6 @@ public class HBasePStore<V> implements PStore<V> {
put(key, FAMILY, value);
}
- @Override
- public void putBlob(String key, V value) {
- put(key, FAMILY_BLOB, value);
- }
-
protected synchronized void put(String key, byte[] family, V value) {
try {
Put put = new Put(row(key));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
index 52808d4..b3947b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
@@ -19,22 +19,16 @@ package org.apache.drill.exec.store.hbase.config;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import com.google.common.collect.Maps;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.store.sys.PStoreRegistry;
import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
-import org.apache.drill.exec.store.sys.local.MapEStore;
-import org.apache.drill.exec.store.sys.zk.ZkEStore;
import org.apache.drill.exec.store.sys.zk.ZkEStoreProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -54,8 +48,6 @@ public class HBasePStoreProvider implements PStoreProvider {
static final byte[] FAMILY = Bytes.toBytes("s");
- static final byte[] FAMILY_BLOB = Bytes.toBytes("t");
-
static final byte[] QUALIFIER = Bytes.toBytes("d");
private final String storeTableName;
@@ -104,11 +96,28 @@ public class HBasePStoreProvider implements PStoreProvider {
this.zkAvailable = false;
}
+
+
@Override
- public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
- return new HBasePStore<V>(store, this.table);
+ public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+ switch(config.getMode()){
+ case EPHEMERAL:
+ if (this.zkAvailable) {
+ return zkEStoreProvider.getStore(config);
+ } else {
+ return localEStoreProvider.getStore(config);
+ }
+
+ case BLOB_PERSISTENT:
+ case PERSISTENT:
+ return new HBasePStore<V>(config, this.table);
+
+ default:
+ throw new IllegalStateException();
+ }
}
+
@Override
public void start() throws IOException {
this.connection = HConnectionManager.createConnection(hbaseConf);
@@ -117,14 +126,13 @@ public class HBasePStoreProvider implements PStoreProvider {
if (!admin.tableExists(storeTableName)) {
HTableDescriptor desc = new HTableDescriptor(storeTableName);
desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1));
- desc.addFamily(new HColumnDescriptor(FAMILY_BLOB).setMaxVersions(1));
admin.createTable(desc);
} else {
HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName));
- if (!desc.hasFamily(FAMILY) || !desc.hasFamily(FAMILY_BLOB)) {
+ if (!desc.hasFamily(FAMILY)) {
throw new DrillRuntimeException("The HBase table " + storeTableName
+ " specified as persistent store exists but does not contain column family: "
- + (desc.hasFamily(FAMILY) ? Bytes.toString(FAMILY_BLOB) : Bytes.toString(FAMILY)));
+ + (Bytes.toString(FAMILY)));
}
}
}
@@ -134,17 +142,6 @@ public class HBasePStoreProvider implements PStoreProvider {
}
@Override
- public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
- // when ZK is available, use ZK as the Ephemeral store.
- // when ZK is not available, use a Map as the Ephemeral store.
- if (this.zkAvailable) {
- return zkEStoreProvider.getEStore(store);
- } else {
- return localEStoreProvider.getEStore(store);
- }
- }
-
- @Override
public synchronized void close() {
if (this.table != null) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
index b1ff44c..5f2d6c7 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
@@ -41,7 +41,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
@Test
public void testTableProvider() throws IOException {
- PStore<String> hbaseStore = provider.getPStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase").build());
+ PStore<String> hbaseStore = provider.getStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase").build());
hbaseStore.put("", "v0");
hbaseStore.put("k1", "v1");
hbaseStore.put("k2", "v2");
@@ -60,7 +60,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
}
assertEquals(7, rowCount);
- PStore<String> hbaseTestStore = provider.getPStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase.test").build());
+ PStore<String> hbaseTestStore = provider.getStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase.test").build());
hbaseTestStore.put("", "v0");
hbaseTestStore.put("k1", "v1");
hbaseTestStore.put("k2", "v2");
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
index f256c98..fc5c05b 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
@@ -69,11 +69,6 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants {
}
@Override
- public V getBlob(String key) {
- throw new UnsupportedOperationException("Mongo DB PStore not currently supported");
- }
-
- @Override
public void put(String key, V value) {
try {
DBObject putObj = new BasicDBObject(2);
@@ -87,11 +82,6 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants {
}
@Override
- public void putBlob(String key, V value) {
- throw new UnsupportedOperationException("Mongo DB PStore not currently supported");
- }
-
- @Override
public boolean putIfAbsent(String key, V value) {
try {
DBObject check = new BasicDBObject(1).append(ID, key);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
index eb4ba53..7443c2e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
@@ -68,13 +68,17 @@ public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants
}
@Override
- public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException {
- return localEStoreProvider.getEStore(storeConfig);
- }
+ public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+ switch(config.getMode()){
+ case BLOB_PERSISTENT:
+ case PERSISTENT:
+ return new MongoPStore<>(config, collection);
+ case EPHEMERAL:
+ return localEStoreProvider.getStore(config);
+ default:
+ throw new IllegalStateException();
- @Override
- public <V> PStore<V> getPStore(PStoreConfig<V> config) throws IOException {
- return new MongoPStore<>(config, collection);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 1308c37..37730e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -38,6 +38,7 @@ public class ControlRpcConfig {
.add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
.add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 461cd8a..a4f9fdf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -60,6 +60,12 @@ public class ControlTunnel {
manager.runCommand(b);
}
+ public DrillRpcFuture<Ack> requestCancelQuery(QueryId queryId){
+ CancelQuery c = new CancelQuery(queryId);
+ manager.runCommand(c);
+ return c.getFuture();
+ }
+
public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, FinishedReceiver finishedReceiver){
ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver);
manager.runCommand(b);
@@ -151,4 +157,18 @@ public class ControlTunnel {
connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class);
}
}
+
+ public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection> {
+ final QueryId queryId;
+
+ public CancelQuery(QueryId queryId) {
+ super();
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index 75fa17c..b54841d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -38,4 +38,5 @@ public class DataRpcConfig {
public static int RPC_VERSION = 3;
public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+ public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
new file mode 100644
index 0000000..1a5e137
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.eigenbase.sql.SqlLiteral;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public abstract class FallbackOptionManager implements OptionManager{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FallbackOptionManager.class);
+
+ private Map<String, OptionValue> options = Maps.newConcurrentMap();
+ private OptionManager fallback;
+
+ public FallbackOptionManager(OptionManager fallback) {
+ super();
+ this.fallback = fallback;
+ }
+
+ @Override
+ public Iterator<OptionValue> iterator() {
+ return Iterables.concat(fallback, options.values()).iterator();
+ }
+
+ @Override
+ public OptionValue getOption(String name) {
+ OptionValue opt = getLocalOption(name);
+ if(opt == null && fallback != null){
+ return fallback.getOption(name);
+ }else{
+ return opt;
+ }
+ }
+
+ abstract OptionValue getLocalOption(String name);
+ abstract boolean setLocalOption(OptionValue value);
+
+ @Override
+ public void setOption(OptionValue value) {
+ fallback.getAdmin().validate(value);
+ setValidatedOption(value);
+ }
+
+ @Override
+ public void setOption(String name, SqlLiteral literal, OptionType type) {
+ OptionValue val = getAdmin().validate(name, literal);
+ val.type = type;
+ setValidatedOption(val);
+ }
+
+ private void setValidatedOption(OptionValue value) {
+ if (!setLocalOption(value)) {
+ fallback.setOption(value);
+ }
+ }
+
+
+ @Override
+ public OptionAdmin getAdmin() {
+ return fallback.getAdmin();
+ }
+
+ @Override
+ public OptionManager getSystemManager() {
+ return fallback.getSystemManager();
+ }
+
+ @Override
+ public OptionList getOptionList() {
+ OptionList list = new OptionList();
+ for (OptionValue o : options.values()) {
+ list.add(o);
+ }
+ return list;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
index 46d316b..e4dbbf8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
@@ -17,67 +17,31 @@
*/
package org.apache.drill.exec.server.options;
-import java.util.Iterator;
import java.util.Map;
-import org.eigenbase.sql.SqlLiteral;
-
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-public class FragmentOptionManager implements OptionManager {
+public class FragmentOptionManager extends InMemoryOptionManager {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class);
- ImmutableMap<String, OptionValue> options;
- OptionManager systemOptions;
-
public FragmentOptionManager(OptionManager systemOptions, OptionList options) {
+ super(systemOptions, getMapFromOptionList(options));
+ }
+
+ private static Map<String, OptionValue> getMapFromOptionList(OptionList options){
Map<String, OptionValue> tmp = Maps.newHashMap();
for(OptionValue v : options){
tmp.put(v.name, v);
}
- this.options = ImmutableMap.copyOf(tmp);
- this.systemOptions = systemOptions;
- }
-
- @Override
- public Iterator<OptionValue> iterator() {
- return Iterables.concat(systemOptions, options.values()).iterator();
+ return ImmutableMap.copyOf(tmp);
}
@Override
- public OptionValue getOption(String name) {
- OptionValue value = options.get(name);
- if (value == null && systemOptions != null) {
- value = systemOptions.getOption(name);
- }
- return value;
- }
-
- @Override
- public void setOption(OptionValue value) throws SetOptionException {
+ boolean supportsOption(OptionValue value) {
throw new UnsupportedOperationException();
}
- @Override
- public void setOption(String name, SqlLiteral literal, OptionValue.OptionType type) throws SetOptionException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public OptionAdmin getAdmin() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public OptionManager getSystemManager() {
- throw new UnsupportedOperationException();
- }
- @Override
- public OptionList getOptionList() {
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
new file mode 100644
index 0000000..59411ce
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.options;
+
+import java.util.Map;
+
+public abstract class InMemoryOptionManager extends FallbackOptionManager {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryOptionManager.class);
+
+ final Map<String, OptionValue> options;
+
+ InMemoryOptionManager(OptionManager fallback, Map<String, OptionValue> options) {
+ super(fallback);
+ this.options = options;
+ }
+
+ @Override
+ OptionValue getLocalOption(String name) {
+ return options.get(name);
+ }
+
+ @Override
+ boolean setLocalOption(OptionValue value) {
+ if(supportsOption(value)){
+ options.put(value.name, value);
+ return true;
+ }else{
+ return false;
+ }
+
+ }
+
+ abstract boolean supportsOption(OptionValue value);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
index 82fc5ba..d04b654 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java
@@ -17,81 +17,21 @@
*/
package org.apache.drill.exec.server.options;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.HashMap;
-import org.eigenbase.sql.SqlLiteral;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class QueryOptionManager implements OptionManager {
+public class QueryOptionManager extends InMemoryOptionManager {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
- private Map<String, OptionValue> options = Maps.newConcurrentMap();
- private OptionManager sessionOptions;
-
public QueryOptionManager(OptionManager sessionOptions) {
- super();
- this.sessionOptions = sessionOptions;
- }
-
- @Override
- public Iterator<OptionValue> iterator() {
- return Iterables.concat(sessionOptions, options.values()).iterator();
- }
-
- @Override
- public OptionValue getOption(String name) {
- OptionValue opt = options.get(name);
- if (opt == null && sessionOptions != null) {
- return sessionOptions.getOption(name);
- } else {
- return opt;
- }
- }
-
- @Override
- public void setOption(OptionValue value) {
- sessionOptions.getAdmin().validate(value);
- setValidatedOption(value);
+ super(sessionOptions, new HashMap<String, OptionValue>());
}
@Override
- public void setOption(String name, SqlLiteral literal, OptionValue.OptionType type) {
- OptionValue val = sessionOptions.getAdmin().validate(name, literal);
- val.type = type;
- setValidatedOption(val);
+ boolean supportsOption(OptionValue value) {
+ return value.type == OptionType.QUERY;
}
- private void setValidatedOption(OptionValue value) {
- if (value.type == OptionValue.OptionType.QUERY) {
- options.put(value.name, value);
- } else {
- sessionOptions.setOption(value);
- }
- }
-
- @Override
- public OptionManager.OptionAdmin getAdmin() {
- return sessionOptions.getAdmin();
- }
-
- @Override
- public OptionManager getSystemManager() {
- return sessionOptions.getSystemManager();
- }
-
- @Override
- public OptionList getOptionList() {
- OptionList list = new OptionList();
- list.addAll(sessionOptions.getOptionList());
- list.addAll(options.values());
- return list;
- }
-
- public OptionList getSessionOptionList() {
- return sessionOptions.getOptionList();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index 4268d02..45c0ce8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -17,79 +17,18 @@
*/
package org.apache.drill.exec.server.options;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import org.eigenbase.sql.SqlLiteral;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class SessionOptionManager implements OptionManager{
+public class SessionOptionManager extends InMemoryOptionManager{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
- private Map<String, OptionValue> options = Maps.newConcurrentMap();
- private OptionManager systemOptions;
-
public SessionOptionManager(OptionManager systemOptions) {
- super();
- this.systemOptions = systemOptions;
- }
-
- @Override
- public Iterator<OptionValue> iterator() {
- return Iterables.concat(systemOptions, options.values()).iterator();
- }
-
- @Override
- public OptionValue getOption(String name) {
- OptionValue opt = options.get(name);
- if(opt == null && systemOptions != null){
- return systemOptions.getOption(name);
- }else{
- return opt;
- }
- }
-
- @Override
- public void setOption(OptionValue value) {
- systemOptions.getAdmin().validate(value);
- setValidatedOption(value);
- }
-
- @Override
- public void setOption(String name, SqlLiteral literal, OptionType type) {
- OptionValue val = systemOptions.getAdmin().validate(name, literal);
- val.type = type;
- setValidatedOption(val);
- }
-
- private void setValidatedOption(OptionValue value) {
- if (value.type == OptionType.SESSION) {
- options.put(value.name, value);
- } else {
- systemOptions.setOption(value);
- }
- }
-
- @Override
- public OptionAdmin getAdmin() {
- return systemOptions.getAdmin();
- }
-
- @Override
- public OptionManager getSystemManager() {
- return systemOptions;
+ super(systemOptions, new ConcurrentHashMap<String, OptionValue>());
}
@Override
- public OptionList getOptionList() {
- OptionList list = new OptionList();
- for (OptionValue o : options.values()) {
- list.add(o);
- }
- return list;
+ boolean supportsOption(OptionValue value) {
+ return value.type == OptionValue.OptionType.SESSION;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9f912e0..1622d7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.server.options;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections.IteratorUtils;
@@ -99,57 +100,61 @@ public class SystemOptionManager implements OptionManager {
}
public SystemOptionManager init() throws IOException{
- this.options = provider.getPStore(config);
+ this.options = provider.getStore(config);
this.admin = new SystemOptionAdmin();
return this;
}
- private class Iter implements Iterator<OptionValue>{
- private Iterator<Map.Entry<String, OptionValue>> inner;
-
- public Iter(Iterator<Map.Entry<String, OptionValue>> inner) {
- this.inner = inner;
- }
-
- @Override
- public boolean hasNext() {
- return inner.hasNext();
- }
-
- @Override
- public OptionValue next() {
- return inner.next().getValue();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
@Override
public Iterator<OptionValue> iterator() {
- return new Iter(options.iterator());
+ Map<String, OptionValue> buildList = Maps.newHashMap();
+ for(OptionValidator v : knownOptions.values()){
+ buildList.put(v.getOptionName(), v.getDefault());
+ }
+ for(Map.Entry<String, OptionValue> v : options){
+ OptionValue value = v.getValue();
+ buildList.put(value.name, value);
+ }
+ return buildList.values().iterator();
}
@Override
public OptionValue getOption(String name) {
- return options.get(name);
+ // check local space
+ OptionValue v = options.get(name);
+ if(v != null){
+ return v;
+ }
+
+ // otherwise, return default.
+ OptionValidator validator = knownOptions.get(name);
+ if(validator == null){
+ return null;
+ }else{
+ return validator.getDefault();
+ }
}
@Override
public void setOption(OptionValue value) {
assert value.type == OptionType.SYSTEM;
admin.validate(value);
- options.put(value.name, value);
+ setOptionInternal(value);
+ }
+
+ private void setOptionInternal(OptionValue value){
+ if(!value.equals(knownOptions.get(value.name))){
+ options.put(value.name, value);
+ }
}
+
@Override
public void setOption(String name, SqlLiteral literal, OptionType type) {
assert type == OptionValue.OptionType.SYSTEM;
OptionValue v = admin.validate(name, literal);
v.type = type;
- options.put(name, v);
+ setOptionInternal(v);
}
@Override
@@ -172,10 +177,24 @@ public class SystemOptionManager implements OptionManager {
public SystemOptionAdmin() {
for(OptionValidator v : VALIDATORS) {
knownOptions.put(v.getOptionName(), v);
- options.putIfAbsent(v.getOptionName(), v.getDefault());
}
- }
+ for(Entry<String, OptionValue> v : options){
+ OptionValue value = v.getValue();
+ OptionValidator defaultValidator = knownOptions.get(v.getKey());
+ if(defaultValidator == null){
+ // deprecated option, delete.
+ options.delete(value.name);
+ logger.warn("Deleting deprecated option `{}`.", value.name);
+ }else if(value.equals(defaultValidator)){
+ // option set with default value, remove storage of record.
+ options.delete(value.name);
+ logger.warn("Deleting option `{}` set to default value.", value.name);
+ }
+
+ }
+
+ }
@Override
public void registerOptionType(OptionValidator validator) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index d6c8ee0..5d0eed6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -87,7 +87,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
this.context = context;
this.pluginSystemTable = context //
.getPersistentStoreProvider() //
- .getPStore(PStoreConfig //
+ .getStore(PStoreConfig //
.newJacksonBuilder(context.getConfig().getMapper(), StoragePluginConfig.class) //
.name("sys.storage_plugins") //
.build());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index acd8fcb..7b9d52c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -88,8 +88,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa
if (storageEngineName == null) {
this.knownViews = null;
} else {
- this.knownViews = provider.getPStore(PStoreConfig //
+ this.knownViews = provider.getStore(PStoreConfig //
.newJacksonBuilder(drillConfig.getMapper(), String.class) //
+ .persist() //
.name(Joiner.on('.').join("storage.views", storageEngineName, schemaName)) //
.build());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
index 7214092..2d04957 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
@@ -18,15 +18,11 @@
package org.apache.drill.exec.store.sys;
-import java.util.Map;
/**
* Interfaces to define EStore, which is keep track of status/information for running queries. The information
* would be gone, if the query is completed, or the foreman drillbit is not responding.
* @param <V>
*/
-public interface EStore <V> extends Iterable<Map.Entry<String, V>> {
- public V get(String key);
- public void put(String key, V value);
- public void delete(String key);
+public interface EStore <V> extends PStore<V> {
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
index 32bf0b1..b09c5b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
@@ -25,5 +25,5 @@ import java.io.IOException;
*/
public interface EStoreProvider {
- public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException;
+ public <V> PStore<V> getStore(PStoreConfig<V> table) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
index 040a99d..26c00ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
@@ -19,11 +19,13 @@ package org.apache.drill.exec.store.sys;
import java.util.Map;
+/**
+ * Interface for reading and writing values to a persistent storage provider. Iterators are guaranteed to be returned in key order.
+ * @param <V>
+ */
public interface PStore<V> extends Iterable<Map.Entry<String, V>> {
public V get(String key);
- public V getBlob(String key);
public void put(String key, V value);
- public void putBlob(String key, V value);
public boolean putIfAbsent(String key, V value);
public void delete(String key);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
index 7d5243f..83c2243 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
@@ -31,11 +31,25 @@ public class PStoreConfig<V> {
private final String name;
private final PClassSerializer<V> valueSerializer;
+ private final Mode mode;
+ private final int maxIteratorSize;
- private PStoreConfig(String name, PClassSerializer<V> valueSerializer) {
+ public static enum Mode {PERSISTENT, EPHEMERAL, BLOB_PERSISTENT};
+
+ private PStoreConfig(String name, PClassSerializer<V> valueSerializer, Mode mode, int maxIteratorSize) {
super();
this.name = name;
this.valueSerializer = valueSerializer;
+ this.mode = mode;
+ this.maxIteratorSize = Math.abs(maxIteratorSize);
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public int getMaxIteratorSize() {
+ return maxIteratorSize;
}
public String getName() {
@@ -57,20 +71,47 @@ public class PStoreConfig<V> {
public static class PStoreConfigBuilder<V> {
String name;
PClassSerializer<V> serializer;
+ Mode mode = Mode.PERSISTENT;
+ int maxIteratorSize = Integer.MAX_VALUE;
PStoreConfigBuilder(PClassSerializer<V> serializer) {
super();
this.serializer = serializer;
}
- public <X extends Builder> PStoreConfigBuilder<V> name(String name) {
+ public PStoreConfigBuilder<V> name(String name) {
this.name = name;
return this;
}
+ public PStoreConfigBuilder<V> persist(){
+ this.mode = Mode.PERSISTENT;
+ return this;
+ }
+
+ public PStoreConfigBuilder<V> ephemeral(){
+ this.mode = Mode.EPHEMERAL;
+ return this;
+ }
+
+ public PStoreConfigBuilder<V> blob(){
+ this.mode = Mode.BLOB_PERSISTENT;
+ return this;
+ }
+
+ /**
+ * Set the maximum size of the iterator. Positive numbers start from the start of the list. Negative numbers start from the end of the list.
+ * @param size
+ * @return
+ */
+ public PStoreConfigBuilder<V> max(int size){
+ this.maxIteratorSize = size;
+ return this;
+ }
+
public PStoreConfig<V> build(){
Preconditions.checkNotNull(name);
- return new PStoreConfig<V>(name, serializer);
+ return new PStoreConfig<V>(name, serializer, mode, maxIteratorSize);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
index f6154e2..6371dfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
@@ -21,7 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
public interface PStoreProvider extends AutoCloseable, Closeable{
- public <V> PStore<V> getPStore(PStoreConfig<V> table) throws IOException;
+
+ public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException;
public void start() throws IOException;
- public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
new file mode 100644
index 0000000..9f6ec29
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.local;
+
+import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class FilePStore<V> implements PStore<V> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilePStore.class);
+
+
+ private final Path basePath;
+ private final PStoreConfig<V> config;
+ private final DrillFileSystem fs;
+
+ public FilePStore(DrillFileSystem fs, Path base, PStoreConfig<V> config) {
+ super();
+ this.basePath = new Path(base, config.getName());
+ this.config = config;
+ this.fs = fs;
+
+ try {
+ mk(basePath);
+ } catch (IOException e) {
+ throw new RuntimeException("Failure setting pstore configuration path.");
+ }
+ }
+
+ private void mk(Path path) throws IOException{
+ fs.getUnderlying().mkdirs(path);
+ }
+
+ public static Path getLogDir(){
+ String drillLogDir = System.getenv("DRILL_LOG_DIR");
+ if (drillLogDir == null) {
+ drillLogDir = "/var/log/drill";
+ }
+ return new Path("file://" + new File(drillLogDir).getAbsoluteFile().toString());
+ }
+
+ public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{
+ Path blobRoot = root == null ? getLogDir() : root;
+ Configuration fsConf = new Configuration();
+ if(blobRoot.toUri().getScheme() != null){
+ fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
+ }
+
+
+ DrillFileSystem fs = FileSystemCreator.getFileSystem(config, fsConf);
+ fs.getUnderlying().mkdirs(blobRoot);
+ return fs;
+ }
+
+ @Override
+ public Iterator<Entry<String, V>> iterator() {
+ try{
+ List<FileStatus> f = fs.list(false, basePath);
+ if (f == null || f.isEmpty()) {
+ return Collections.emptyIterator();
+ }
+ List<String> files = Lists.newArrayList();
+
+ for (FileStatus stat : f) {
+ String s = stat.getPath().getName();
+ if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
+ files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
+ }
+ }
+
+ Collections.sort(files);
+ files = files.subList(0, Math.min(files.size(), config.getMaxIteratorSize()));
+ return new Iter(files.iterator());
+
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Path p(String name) throws IOException {
+ Preconditions.checkArgument(
+ !name.contains("/") &&
+ !name.contains(":") &&
+ !name.contains(".."));
+
+ Path f = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
+ // do this to check file name.
+ return f;
+ }
+
+ public V get(String key) {
+ try{
+ Path path = p(key);
+ if(!fs.getUnderlying().exists(path)){
+ return null;
+ }
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+
+ try (InputStream is = fs.open(p(key)).getInputStream()) {
+ return config.getSerializer().deserialize(IOUtils.toByteArray(is));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void put(String key, V value) {
+ try (OutputStream os = fs.create(p(key)).getOuputStream()) {
+ IOUtils.write(config.getSerializer().serialize(value), os);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean putIfAbsent(String key, V value) {
+ try {
+ Path p = p(key);
+ if (fs.getUnderlying().exists(p)) {
+ return false;
+ } else {
+ put(key, value);
+ return true;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void delete(String key) {
+ try {
+ fs.getUnderlying().delete(p(key), false);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private class Iter implements Iterator<Entry<String, V>>{
+
+ private Iterator<String> keys;
+ private String current;
+
+ public Iter(Iterator<String> keys) {
+ super();
+ this.keys = keys;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public Entry<String, V> next() {
+ current = keys.next();
+ return new DeferredEntry(current);
+ }
+
+ @Override
+ public void remove() {
+ delete(current);
+ keys.remove();
+ }
+
+ private class DeferredEntry implements Entry<String, V> {
+
+ private String name;
+
+
+ public DeferredEntry(String name) {
+ super();
+ this.name = name;
+ }
+
+ @Override
+ public String getKey() {
+ return name;
+ }
+
+ @Override
+ public V getValue() {
+ return get(name);
+ }
+
+ @Override
+ public V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
index b505fd4..094d093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
@@ -18,19 +18,24 @@
package org.apache.drill.exec.store.sys.local;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.EStoreProvider;
import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
public class LocalEStoreProvider implements EStoreProvider{
private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = Maps.newConcurrentMap();
@Override
- public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException {
+ public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
+ Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral.");
+
if (! (estores.containsKey(storeConfig)) ) {
EStore<V> p = new MapEStore<V>();
EStore<?> p2 = estores.putIfAbsent(storeConfig, p);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
deleted file mode 100644
index c10f862..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.local;
-
-import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class LocalPStore<V> implements PStore<V> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStore.class);
-
- private static final String BLOB_QUALIFIER = "blob";
-
- private final File basePath;
- private final File blobPath;
- private final PStoreConfig<V> config;
- public LocalPStore(File base, PStoreConfig<V> config) {
- super();
- this.basePath = new File(base, config.getName());
- this.blobPath = new File(basePath, BLOB_QUALIFIER);
- if (!blobPath.exists()) {
- blobPath.mkdirs();
- }
- this.config = config;
- }
-
- @Override
- public Iterator<Entry<String, V>> iterator() {
- String[] f = basePath.list();
- if (f == null) {
- return Collections.emptyIterator();
- }
- List<String> files = Lists.newArrayList();
- for (String s : f) {
- if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
- files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
- }
- }
-
- return new Iter(files.iterator());
- }
-
- private File p(String name, boolean blob) throws IOException {
- Preconditions.checkArgument(
- !name.contains("/") &&
- !name.contains(":") &&
- !name.contains(".."));
-
- File f = new File(blob ? blobPath : basePath, name + DRILL_SYS_FILE_SUFFIX);
- // do this to check file name.
- f.getCanonicalPath();
- return f;
- }
-
- @Override
- public V get(String key) {
- return get(key, false);
- }
-
- @Override
- public V getBlob(String key) {
- return get(key, true);
- }
-
- protected V get(String key, boolean blob) {
- try (InputStream is = new FileInputStream(p(key, blob))) {
- return config.getSerializer().deserialize(IOUtils.toByteArray(is));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void put(String key, V value) {
- put(key, false, value);
- }
-
- @Override
- public void putBlob(String key, V value) {
- put(key, true, value);
- }
-
- protected void put(String key, boolean blob, V value) {
- try (OutputStream os = new FileOutputStream(p(key, blob))) {
- IOUtils.write(config.getSerializer().serialize(value), os);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean putIfAbsent(String key, V value) {
- try {
- File f = p(key, false);
- if (f.exists()) {
- return false;
- } else {
- put(key, value);
- return true;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void delete(String key) {
- try {
- delete(key, false);
- delete(key, true);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- protected void delete(String key, boolean blob) throws IOException {
- try {
- p(key, blob).delete();
- } catch (FileNotFoundException e) { /* ignored */ }
- }
-
- private class Iter implements Iterator<Entry<String, V>>{
-
- private Iterator<String> keys;
- private String current;
-
- public Iter(Iterator<String> keys) {
- super();
- this.keys = keys;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public Entry<String, V> next() {
- current = keys.next();
- return new DeferredEntry(current);
- }
-
- @Override
- public void remove() {
- delete(current);
- keys.remove();
- }
-
- private class DeferredEntry implements Entry<String, V> {
-
- private String name;
-
- public DeferredEntry(String name) {
- super();
- this.name = name;
- }
-
- @Override
- public String getKey() {
- return name;
- }
-
- @Override
- public V getValue() {
- return get(name);
- }
-
- @Override
- public V setValue(V value) {
- throw new UnsupportedOperationException();
- }
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
index c413866..ac53a61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
@@ -17,19 +17,19 @@
*/
package org.apache.drill.exec.store.sys.local;
-import java.io.File;
import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
import org.apache.drill.exec.store.sys.PStoreProvider;
-
-import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.hadoop.fs.Path;
/**
* A really simple provider that stores data in the local file system, one value per file.
@@ -37,21 +37,21 @@ import com.google.common.collect.Maps;
public class LocalPStoreProvider implements PStoreProvider {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStoreProvider.class);
- private File path;
+ private final Path path;
private final boolean enableWrite;
- private ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores;
+ private final ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores;
private final LocalEStoreProvider estoreProvider;
+ private final DrillFileSystem fs;
- public LocalPStoreProvider(DrillConfig config) {
- path = new File(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
- enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE);
- if (!enableWrite) {
- pstores = Maps.newConcurrentMap();
- }
- estoreProvider = new LocalEStoreProvider();
+ public LocalPStoreProvider(DrillConfig config) throws IOException {
+ this.path = new Path(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
+ this.enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE);
+ this.pstores = enableWrite ? null : new ConcurrentHashMap<PStoreConfig<?>, PStore<?>>();
+ this.estoreProvider = new LocalEStoreProvider();
+ this.fs = FilePStore.getFileSystem(config, path);
}
- public LocalPStoreProvider(PStoreRegistry registry) {
+ public LocalPStoreProvider(PStoreRegistry registry) throws IOException {
this(registry.getConfig());
}
@@ -60,14 +60,22 @@ public class LocalPStoreProvider implements PStoreProvider {
}
@Override
- public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException {
- return estoreProvider.getEStore(storeConfig);
+ public <V> PStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
+ switch(storeConfig.getMode()){
+ case EPHEMERAL:
+ return estoreProvider.getStore(storeConfig);
+ case BLOB_PERSISTENT:
+ case PERSISTENT:
+ return getPStore(storeConfig);
+ default:
+ throw new IllegalStateException();
+ }
+
}
- @Override
- public <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException {
+ private <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException {
if (enableWrite) {
- return new LocalPStore<V>(path, storeConfig);
+ return new FilePStore<V>(fs, path, storeConfig);
} else {
PStore<V> p = new NoWriteLocalPStore<V>();
PStore<?> p2 = pstores.putIfAbsent(storeConfig, p);
@@ -78,6 +86,7 @@ public class LocalPStoreProvider implements PStoreProvider {
}
}
+
@Override
public void start() {
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
index 84e5027..2723916 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
@@ -51,4 +51,10 @@ public class MapEStore <V> implements EStore<V> {
public Iterator<Map.Entry<String, V>> iterator() {
return store.entrySet().iterator();
}
+
+ @Override
+ public boolean putIfAbsent(String key, V value) {
+ V out = store.putIfAbsent(key, value);
+ return out == null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
index d1ef931..71a41f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
@@ -47,21 +47,11 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
}
@Override
- public V getBlob(String key) {
- return blobMap.get(key);
- }
-
- @Override
public void put(String key, V value) {
map.put(key, value);
}
@Override
- public void putBlob(String key, V value) {
- blobMap.put(key, value);
- }
-
- @Override
public boolean putIfAbsent(String key, V value) {
return null == map.putIfAbsent(key, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
index b88ff74..d61f3b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
@@ -17,16 +17,25 @@
*/
package org.apache.drill.exec.store.sys.zk;
-import com.google.common.base.Preconditions;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.zookeeper.CreateMode;
-
import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.zookeeper.CreateMode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
/**
* This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store)
* @param <V>
@@ -35,6 +44,7 @@ public abstract class ZkAbstractStore<V> {
protected CuratorFramework framework;
protected PStoreConfig<V> config;
+ private final PathChildrenCache childrenCache;
private String prefix;
private String parent;
@@ -50,16 +60,19 @@ public abstract class ZkAbstractStore<V> {
if (framework.checkExists().forPath(parent) == null) {
framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
}
+
+ this.childrenCache = new PathChildrenCache(framework, parent, true);
+ this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
+
} catch (Exception e) {
- throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
+ throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e);
}
}
public Iterator<Entry<String, V>> iterator() {
try {
- List<String> children = framework.getChildren().forPath(parent);
- return new Iter(children.iterator());
+ return new Iter(childrenCache.getCurrentData());
} catch (Exception e) {
throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
}
@@ -73,10 +86,11 @@ public abstract class ZkAbstractStore<V> {
public V get(String key) {
try {
- byte[] bytes = framework.getData().forPath(p(key));
- if (bytes == null) {
+ ChildData d = childrenCache.getCurrentData(p(key));
+ if(d == null || d.getData() == null){
return null;
}
+ byte[] bytes = d.getData();
return config.getSerializer().deserialize(bytes);
} catch (Exception e) {
@@ -86,11 +100,12 @@ public abstract class ZkAbstractStore<V> {
public void put(String key, V value) {
try {
- if (framework.checkExists().forPath(p(key)) != null) {
+ if (childrenCache.getCurrentData(p(key)) != null) {
framework.setData().forPath(p(key), config.getSerializer().serialize(value));
} else {
createNodeInZK(key, value);
}
+ childrenCache.rebuildNode(p(key));
} catch (Exception e) {
throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
@@ -100,21 +115,43 @@ public abstract class ZkAbstractStore<V> {
public void delete(String key) {
try {
framework.delete().forPath(p(key));
+ childrenCache.rebuildNode(p(key));
} catch (Exception e) {
throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
}
}
+ public boolean putIfAbsent(String key, V value) {
+ try {
+ if (childrenCache.getCurrentData(p(key)) != null) {
+ return false;
+ } else {
+ createNodeInZK(key, value);
+ childrenCache.rebuildNode(p(key));
+ return true;
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failure while accessing Zookeeper", e);
+ }
+ }
+
public abstract void createNodeInZK (String key, V value);
private class Iter implements Iterator<Entry<String, V>>{
- private Iterator<String> keys;
- private String current;
+ private Iterator<ChildData> keys;
+ private ChildData current;
- public Iter(Iterator<String> keys) {
+ public Iter(List<ChildData> children) {
super();
- this.keys = keys;
+ List<ChildData> sortedChildren = Lists.newArrayList(children);
+ Collections.sort(sortedChildren, new Comparator<ChildData>(){
+ @Override
+ public int compare(ChildData o1, ChildData o2) {
+ return o1.getPath().compareTo(o2.getPath());
+ }});
+ this.keys = sortedChildren.iterator();
}
@Override
@@ -130,27 +167,35 @@ public abstract class ZkAbstractStore<V> {
@Override
public void remove() {
- delete(current);
- keys.remove();
+ delete(keyFromPath(current));
+ }
+
+ private String keyFromPath(ChildData data){
+ String path = data.getPath();
+ return path.substring(prefix.length(), path.length());
}
private class DeferredEntry implements Entry<String, V>{
- private String name;
+ private ChildData data;
- public DeferredEntry(String name) {
+ public DeferredEntry(ChildData data) {
super();
- this.name = name;
+ this.data = data;
}
@Override
public String getKey() {
- return name;
+ return keyFromPath(data);
}
@Override
public V getValue() {
- return get(name);
+ try {
+ return config.getSerializer().deserialize(data.getData());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
index 34b59a7..1c2c3fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
@@ -22,6 +22,9 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.EStoreProvider;
import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
+
+import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -33,7 +36,8 @@ public class ZkEStoreProvider implements EStoreProvider{
}
@Override
- public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
+ public <V> EStore<V> getStore(PStoreConfig<V> store) throws IOException {
+ Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL);
return new ZkEStore<V>(curator,store);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
index 601ba8f..a597381 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
@@ -40,17 +40,9 @@ import com.google.common.base.Preconditions;
*/
public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{
- private DrillFileSystem fs;
- private Path blobPath;
- private boolean blobPathCreated;
-
- ZkPStore(CuratorFramework framework, DrillFileSystem fs, Path blobRoot, PStoreConfig<V> config)
+ ZkPStore(CuratorFramework framework, PStoreConfig<V> config)
throws IOException {
super(framework, config);
-
- this.fs = fs;
- this.blobPath = new Path(blobRoot, config.getName());
- this.blobPathCreated = false;
}
@Override
@@ -62,53 +54,7 @@ public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{
}
}
- @Override
- public boolean putIfAbsent(String key, V value) {
- try {
- if (framework.checkExists().forPath(p(key)) != null) {
- return false;
- } else {
- framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
- return true;
- }
-
- } catch (Exception e) {
- throw new RuntimeException("Failure while accessing Zookeeper", e);
- }
- }
-
- @Override
- public V getBlob(String key) {
- try (DrillInputStream is = fs.open(path(key))) {
- return config.getSerializer().deserialize(IOUtils.toByteArray(is.getInputStream()));
- } catch (FileNotFoundException e) {
- return null;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void putBlob(String key, V value) {
- try (DrillOutputStream os = fs.create(path(key))) {
- IOUtils.write(config.getSerializer().serialize(value), os.getOuputStream());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- private Path path(String name) throws IOException {
- Preconditions.checkArgument(
- !name.contains("/") &&
- !name.contains(":") &&
- !name.contains(".."));
- if (!blobPathCreated) {
- fs.mkdirs(blobPath);
- blobPathCreated = true;
- }
-
- return new Path(blobPath, name + DRILL_SYS_FILE_SUFFIX);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index aa64f53..03d2441 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -21,18 +21,17 @@ import java.io.File;
import java.io.IOException;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.drill.exec.store.sys.local.FilePStore;
import org.apache.hadoop.fs.Path;
import com.google.common.annotations.VisibleForTesting;
@@ -45,9 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider {
private final CuratorFramework curator;
private final DrillFileSystem fs;
-
private final Path blobRoot;
-
private final ZkEStoreProvider zkEStoreProvider;
public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
@@ -59,49 +56,45 @@ public class ZkPStoreProvider implements PStoreProvider {
if (registry.getConfig().hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) {
blobRoot = new Path(registry.getConfig().getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT));
- } else {
- String drillLogDir = System.getenv("DRILL_LOG_DIR");
- if (drillLogDir == null) {
- drillLogDir = "/var/log/drill";
- }
- blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI());
+ }else{
+ blobRoot = FilePStore.getLogDir();
}
- Configuration fsConf = new Configuration();
- fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
- try {
- fs = FileSystemCreator.getFileSystem(registry.getConfig(), fsConf);
- fs.mkdirs(blobRoot);
- } catch (IOException e) {
- throw new DrillbitStartupException("Unable to initialize blob storage.", e);
+
+ try{
+ this.fs = FilePStore.getFileSystem(registry.getConfig(), blobRoot);
+ }catch(IOException e){
+ throw new DrillbitStartupException("Failure while attempting to set up blob store.", e);
}
- zkEStoreProvider = new ZkEStoreProvider(curator);
+
+ this.zkEStoreProvider = new ZkEStoreProvider(curator);
}
@VisibleForTesting
- public ZkPStoreProvider(CuratorFramework curator) {
+ public ZkPStoreProvider(DrillConfig config, CuratorFramework curator) throws IOException {
this.curator = curator;
- this.fs = null;
- String drillLogDir = System.getenv("DRILL_LOG_DIR");
- if (drillLogDir == null) {
- drillLogDir = "/var/log/drill";
- }
- blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI());
- zkEStoreProvider = new ZkEStoreProvider(curator);
+ this.blobRoot = FilePStore.getLogDir();
+ this.fs = FilePStore.getFileSystem(config, blobRoot);
+ this.zkEStoreProvider = new ZkEStoreProvider(curator);
}
@Override
public void close() {
}
- @Override
- public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
- return zkEStoreProvider.getEStore(store);
- }
@Override
- public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
- return new ZkPStore<V>(curator, fs, blobRoot, store);
+ public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+ switch(config.getMode()){
+ case BLOB_PERSISTENT:
+ return new FilePStore<V>(fs, blobRoot, config);
+ case EPHEMERAL:
+ return zkEStoreProvider.getStore(config);
+ case PERSISTENT:
+ return new ZkPStore<V>(curator, config);
+ default:
+ throw new IllegalStateException();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index fc0972f..3228da9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -83,6 +83,16 @@ public class ControlHandlerImpl implements ControlMessageHandler {
// TODO: Support a type of message that has no response.
return DataRpcConfig.OK;
+ case RpcType.REQ_QUERY_CANCEL_VALUE:
+ QueryId id = get(pBody, QueryId.PARSER);
+ Foreman f = bee.getForemanForQueryId(id);
+ if(f != null){
+ f.cancel();
+ return DataRpcConfig.OK;
+ }else{
+ return DataRpcConfig.FAIL;
+ }
+
case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE:
InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
for(int i =0; i < fragments.getFragmentCount(); i++){
@@ -95,13 +105,9 @@ public class ControlHandlerImpl implements ControlMessageHandler {
Foreman foreman = bee.getForemanForQueryId(queryId);
QueryProfile profile;
if (foreman == null) {
- try {
- profile = bee.getContext().getPersistentStoreProvider().getEStore(QueryStatus.RUNNING_QUERY_PROFILE).get(QueryIdHelper.getQueryId(queryId));
- } catch (IOException e) {
- throw new RpcException("Failed to get persistent store", e);
- }
+ throw new RpcException("Query not running on node.");
} else {
- profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile(true);
+ profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile();
}
return new Response(RpcType.RESP_QUERY_STATUS, profile);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index e47c0be..7a0e501 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -436,7 +436,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
@Override
public int compareTo(Object o) {
- return o.hashCode() - o.hashCode();
+ return hashCode() - o.hashCode();
}
}