You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/06/03 04:45:59 UTC
[04/12] drill git commit: DRILL-5481: Allow to persist profiles
in-memory only with a max capacity
DRILL-5481: Allow to persist profiles in-memory only with a max capacity
1. Introduced an InMemoryStoreProvider with the ability to maintain a max capacity
2. DrillbitContext now explicitly has a profileStoreProvider that, by default, re-uses the general PersistentStoreProvider, unless it is InMemory, which is when #1 is used.
2. Cleanly separated out QueryProfileStoreContext
3. Converted literal values to constants within ExecConstants
4. Updated drill-module.conf for default capacity
closes #834
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9ba4af86
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9ba4af86
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9ba4af86
Branch: refs/heads/master
Commit: 9ba4af860e3def8f880eef13e353a730cb3b18ea
Parents: d7bc213
Author: Kunal Khatua <kk...@maprtech.com>
Authored: Mon May 15 13:33:49 2017 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jun 2 21:43:14 2017 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 2 +
.../org/apache/drill/exec/ops/QueryContext.java | 5 +
.../org/apache/drill/exec/server/Drillbit.java | 20 ++-
.../drill/exec/server/DrillbitContext.java | 22 ++-
.../exec/server/QueryProfileStoreContext.java | 79 ++++++++++
.../server/rest/profile/ProfileResources.java | 14 +-
.../exec/store/sys/PersistentStoreConfig.java | 16 ++-
.../exec/store/sys/store/InMemoryStore.java | 143 +++++++++++++++++++
.../store/provider/InMemoryStoreProvider.java | 51 +++++++
.../org/apache/drill/exec/work/WorkManager.java | 5 +-
.../drill/exec/work/foreman/QueryManager.java | 32 ++---
.../src/main/resources/drill-module.conf | 4 +
12 files changed, 357 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 83ffb20..ba98532 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -107,6 +107,8 @@ public interface ExecConstants {
String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
+ String PROFILES_STORE_INMEMORY = "drill.exec.profiles.store.inmemory";
+ String PROFILES_STORE_CAPACITY = "drill.exec.profiles.store.capacity";
String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms";
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index df3f4f4..0dbeea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.QueryOptionManager;
@@ -209,6 +210,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
return drillbitContext.getConfig();
}
+ public QueryProfileStoreContext getProfileStoreContext() {
+ return drillbitContext.getProfileStoreContext();
+ }
+
@Override
public FunctionImplementationRegistry getFunctionRegistry() {
return drillbitContext.getFunctionImplementationRegistry();
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index f225714..0d341df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
@@ -76,6 +77,7 @@ public class Drillbit implements AutoCloseable {
private final WebServer webServer;
private RegistrationHandle registrationHandle;
private volatile StoragePluginRegistry storageRegistry;
+ private final PersistentStoreProvider profileStoreProvider;
@VisibleForTesting
public Drillbit(
@@ -105,6 +107,14 @@ public class Drillbit implements AutoCloseable {
isDistributedMode = true;
}
+ //Check if InMemory Profile Store, else use Default Store Provider
+ if (config.getBoolean(ExecConstants.PROFILES_STORE_INMEMORY)) {
+ profileStoreProvider = new InMemoryStoreProvider(config.getInt(ExecConstants.PROFILES_STORE_CAPACITY));
+ logger.info("Upto {} latest query profiles will be retained in-memory", config.getInt(ExecConstants.PROFILES_STORE_CAPACITY));
+ } else {
+ profileStoreProvider = storeProvider;
+ }
+
engine = new ServiceEngine(manager, context, allowPortHunting, isDistributedMode);
logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
@@ -115,8 +125,11 @@ public class Drillbit implements AutoCloseable {
logger.debug("Startup begun.");
coord.start(10000);
storeProvider.start();
+ if (profileStoreProvider != storeProvider) {
+ profileStoreProvider.start();
+ }
final DrillbitEndpoint md = engine.start();
- manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
+ manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider, profileStoreProvider);
final DrillbitContext drillbitContext = manager.getContext();
storageRegistry = drillbitContext.getStorage();
storageRegistry.init();
@@ -164,6 +177,11 @@ public class Drillbit implements AutoCloseable {
manager,
storageRegistry,
context);
+
+ //Closing the profile store provider if distinct
+ if (storeProvider != profileStoreProvider) {
+ AutoCloseables.close(profileStoreProvider);
+ }
} catch(Exception e) {
logger.warn("Failure on close()", e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 6c68ab2..b8d3e68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -65,7 +65,7 @@ public class DrillbitContext implements AutoCloseable {
private final LogicalPlanPersistence lpPersistence;
// operator table for standard SQL operators and functions, Drill built-in UDFs
private final DrillOperatorTable table;
-
+ private final QueryProfileStoreContext profileStoreContext;
public DrillbitContext(
DrillbitEndpoint endpoint,
@@ -75,6 +75,19 @@ public class DrillbitContext implements AutoCloseable {
DataConnectionCreator connectionsPool,
WorkEventBus workBus,
PersistentStoreProvider provider) {
+ //PersistentStoreProvider is re-used for providing Query Profile Store as well
+ this(endpoint, context, coord, controller, connectionsPool, workBus, provider, provider);
+ }
+
+ public DrillbitContext(
+ DrillbitEndpoint endpoint,
+ BootStrapContext context,
+ ClusterCoordinator coord,
+ Controller controller,
+ DataConnectionCreator connectionsPool,
+ WorkEventBus workBus,
+ PersistentStoreProvider provider,
+ PersistentStoreProvider profileStoreProvider) {
this.classpathScan = context.getClasspathScan();
this.workBus = workBus;
this.controller = checkNotNull(controller);
@@ -97,6 +110,13 @@ public class DrillbitContext implements AutoCloseable {
// This operator table is built once and used for all queries which do not need dynamic UDF support.
this.table = new DrillOperatorTable(functionRegistry, systemOptions);
+
+ //This profile store context is built from the profileStoreProvider
+ this.profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord);
+ }
+
+ public QueryProfileStoreContext getProfileStoreContext() {
+ return profileStoreContext;
}
public FunctionImplementationRegistry getFunctionImplementationRegistry() {
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java
new file mode 100644
index 0000000..7f282d5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig.StoreConfigBuilder;
+
+public class QueryProfileStoreContext {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryProfileStoreContext.class);
+
+ private static final String PROFILES = "profiles";
+
+ private static final String RUNNING = "running";
+
+ private final PersistentStore<UserBitShared.QueryProfile> completedProfiles;
+
+ private final TransientStore<UserBitShared.QueryInfo> runningProfiles;
+
+ private final PersistentStoreConfig<QueryProfile> profileStoreConfig;
+
+ public QueryProfileStoreContext(DrillConfig config, PersistentStoreProvider storeProvider,
+ ClusterCoordinator coordinator) {
+ profileStoreConfig = PersistentStoreConfig.newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE,
+ SchemaUserBitShared.QueryProfile.MERGE)
+ .name(PROFILES)
+ .blob()
+ .build();
+
+ try {
+ completedProfiles = storeProvider.getOrCreateStore(profileStoreConfig);
+ } catch (final Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+
+ runningProfiles = coordinator.getOrCreateTransientStore(TransientStoreConfig
+ .newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
+ .name(RUNNING)
+ .build());
+ }
+
+ public PersistentStoreConfig<QueryProfile> getProfileStoreConfig() {
+ return profileStoreConfig;
+ }
+
+ public PersistentStore<QueryProfile> getCompletedProfileStore() {
+ return completedProfiles;
+ }
+
+ public TransientStore<QueryInfo> getRunningProfileStore() {
+ return runningProfiles;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 044b792..468ec56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.server.rest.ViewableWithPermissions;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.store.sys.PersistentStore;
@@ -180,8 +181,9 @@ public class ProfileResources {
@Produces(MediaType.APPLICATION_JSON)
public QProfiles getProfilesJSON(@Context UriInfo uriInfo) {
try {
- final PersistentStore<QueryProfile> completed = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
- final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+ final QueryProfileStoreContext profileStoreContext = work.getContext().getProfileStoreContext();
+ final PersistentStore<QueryProfile> completed = profileStoreContext.getCompletedProfileStore();
+ final TransientStore<QueryInfo> running = profileStoreContext.getRunningProfileStore();
final List<String> errors = Lists.newArrayList();
@@ -258,7 +260,7 @@ public class ProfileResources {
// then check remote running
try {
- final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+ final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
final QueryInfo info = running.get(queryId);
if (info != null) {
QueryProfile queryProfile = work.getContext()
@@ -275,7 +277,7 @@ public class ProfileResources {
// then check blob store
try {
- final PersistentStore<QueryProfile> profiles = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
+ final PersistentStore<QueryProfile> profiles = work.getContext().getProfileStoreContext().getCompletedProfileStore();
final QueryProfile queryProfile = profiles.get(queryId);
if (queryProfile != null) {
checkOrThrowProfileViewAuthorization(queryProfile);
@@ -296,7 +298,7 @@ public class ProfileResources {
@Produces(MediaType.APPLICATION_JSON)
public String getProfileJSON(@PathParam("queryid") String queryId) {
try {
- return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
+ return new String(work.getContext().getProfileStoreContext().getProfileStoreConfig().getSerializer().serialize(getQueryProfile(queryId)));
} catch (Exception e) {
logger.debug("Failed to serialize profile for: " + queryId);
return ("{ 'message' : 'error (unable to serialize profile)' }");
@@ -329,7 +331,7 @@ public class ProfileResources {
// then check remote running
try {
- final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+ final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
final QueryInfo info = running.get(queryId);
checkOrThrowQueryCancelAuthorization(info.getUser(), queryId);
Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
index 00a75a2..3b5e7ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
@@ -38,11 +38,17 @@ public class PersistentStoreConfig<V> {
private final String name;
private final InstanceSerializer<V> valueSerializer;
private final PersistentStoreMode mode;
+ private final int capacity;
- protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer, PersistentStoreMode mode) {
+ protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer, PersistentStoreMode mode, int capacity) {
this.name = name;
this.valueSerializer = valueSerializer;
this.mode = mode;
+ this.capacity = capacity;
+ }
+
+ public int getCapacity() {
+ return capacity;
}
public PersistentStoreMode getMode() {
@@ -85,6 +91,7 @@ public class PersistentStoreConfig<V> {
private String name;
private InstanceSerializer<V> serializer;
private PersistentStoreMode mode = PersistentStoreMode.PERSISTENT;
+ private int capacity;
protected StoreConfigBuilder(InstanceSerializer<V> serializer) {
super();
@@ -106,9 +113,14 @@ public class PersistentStoreConfig<V> {
return this;
}
+ public StoreConfigBuilder<V> setCapacity(int capacity) {
+ this.capacity = capacity;
+ return this;
+ }
+
public PersistentStoreConfig<V> build(){
Preconditions.checkNotNull(name);
- return new PersistentStoreConfig<>(name, serializer, mode);
+ return new PersistentStoreConfig<>(name, serializer, mode, capacity);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
new file mode 100644
index 0000000..10da92d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.store.sys.BasePersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+
+import com.google.common.collect.Iterables;
+
+public class InMemoryStore<V> extends BasePersistentStore<V> {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryPersistentStore.class);
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
+ private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
+ private final ConcurrentSkipListMap<String, V> store;
+ private int version = -1;
+ private final int capacity;
+ private final AtomicInteger currentSize = new AtomicInteger();
+
+ public InMemoryStore(int capacity) {
+ this.capacity = capacity;
+ //Allows us to trim out the oldest elements to maintain finite max size
+ this.store = new ConcurrentSkipListMap<String, V>();
+ }
+
+ @Override
+ public void delete(final String key) {
+ try (AutoCloseableLock lock = writeLock.open()) {
+ store.remove(key);
+ version++;
+ }
+ }
+
+ @Override
+ public PersistentStoreMode getMode() {
+ return PersistentStoreMode.BLOB_PERSISTENT;
+ }
+
+ @Override
+ public boolean contains(final String key) {
+ return contains(key, null);
+ }
+
+ @Override
+ public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ if (dataChangeVersion != null) {
+ dataChangeVersion.setVersion(version);
+ }
+ return store.containsKey(key);
+ }
+ }
+
+ @Override
+ public V get(final String key) {
+ return get(key, null);
+ }
+
+ @Override
+ public V get(final String key, final DataChangeVersion dataChangeVersion) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ if (dataChangeVersion != null) {
+ dataChangeVersion.setVersion(version);
+ }
+ return store.get(key);
+ }
+ }
+
+ @Override
+ public void put(final String key, final V value) {
+ put(key, value, null);
+ }
+
+ @Override
+ public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
+ try (AutoCloseableLock lock = writeLock.open()) {
+ if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
+ throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
+ }
+ store.put(key, value);
+ if (currentSize.incrementAndGet() > capacity) {
+ //Pop Out Oldest
+ store.pollLastEntry();
+ currentSize.decrementAndGet();
+ }
+
+ version++;
+ }
+ }
+
+ @Override
+ public boolean putIfAbsent(final String key, final V value) {
+ try (AutoCloseableLock lock = writeLock.open()) {
+ final V old = store.putIfAbsent(key, value);
+ if (old == null) {
+ version++;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ try (AutoCloseableLock lock = writeLock.open()) {
+ store.clear();
+ version = -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
new file mode 100644
index 0000000..ffe7b18
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store.provider;
+
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.InMemoryStore;
+
+public class InMemoryStoreProvider implements PersistentStoreProvider {
+
+ private int capacity;
+
+ public InMemoryStoreProvider(int capacity) {
+ this.capacity = capacity;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
+ return new InMemoryStore<>(capacity);
+ }
+
+ @Override
+ public void start() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index c352861..2d37b8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -102,8 +102,9 @@ public class WorkManager implements AutoCloseable {
final Controller controller,
final DataConnectionCreator data,
final ClusterCoordinator coord,
- final PersistentStoreProvider provider) {
- dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider);
+ final PersistentStoreProvider provider,
+ final PersistentStoreProvider profilesProvider) {
+ dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, profilesProvider);
statusThread.start();
DrillMetrics.register("drill.fragments.running",
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 77c20a5..ecbccf3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -31,7 +31,6 @@ import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.store.TransientStore;
-import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -52,7 +51,6 @@ import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.store.sys.PersistentStore;
-import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
@@ -68,17 +66,6 @@ import com.google.common.collect.Maps;
public class QueryManager implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
- public static final PersistentStoreConfig<QueryProfile> QUERY_PROFILE = PersistentStoreConfig.
- newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE)
- .name("profiles")
- .blob()
- .build();
-
- public static final TransientStoreConfig<QueryInfo> RUNNING_QUERY_INFO = TransientStoreConfig
- .newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
- .name("running")
- .build();
-
private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
private final QueryId queryId;
private final String stringQueryId;
@@ -93,8 +80,8 @@ public class QueryManager implements AutoCloseable {
new IntObjectHashMap<>();
private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
- private final PersistentStore<QueryProfile> profileStore;
- private final TransientStore<QueryInfo> transientProfiles;
+ private final PersistentStore<QueryProfile> completedProfileStore;
+ private final TransientStore<QueryInfo> runningProfileStore;
// the following mutable variables are used to capture ongoing query status
private String planText;
@@ -119,12 +106,9 @@ public class QueryManager implements AutoCloseable {
this.foreman = foreman;
stringQueryId = QueryIdHelper.getQueryId(queryId);
- try {
- profileStore = storeProvider.getOrCreateStore(QUERY_PROFILE);
- } catch (final Exception e) {
- throw new DrillRuntimeException(e);
- }
- transientProfiles = coordinator.getOrCreateTransientStore(RUNNING_QUERY_INFO);
+
+ this.completedProfileStore = foreman.getQueryContext().getProfileStoreContext().getCompletedProfileStore();
+ this.runningProfileStore = foreman.getQueryContext().getProfileStoreContext().getRunningProfileStore();
}
private static boolean isTerminal(final FragmentState state) {
@@ -298,7 +282,7 @@ public class QueryManager implements AutoCloseable {
case STARTING:
case RUNNING:
case CANCELLATION_REQUESTED:
- transientProfiles.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
+ runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
inTransientStore = true;
break;
@@ -306,7 +290,7 @@ public class QueryManager implements AutoCloseable {
case CANCELED:
case FAILED:
try {
- transientProfiles.remove(stringQueryId);
+ runningProfileStore.remove(stringQueryId);
inTransientStore = false;
} catch(final Exception e) {
logger.warn("Failure while trying to delete the estore profile for this query.", e);
@@ -321,7 +305,7 @@ public class QueryManager implements AutoCloseable {
void writeFinalProfile(UserException ex) {
try {
// TODO(DRILL-2362) when do these ever get deleted?
- profileStore.put(stringQueryId, getQueryProfile(ex));
+ completedProfileStore.put(stringQueryId, getQueryProfile(ex));
} catch (Exception e) {
logger.error("Failure while storing Query Profile", e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7c095ac..5ba4526 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -141,6 +141,10 @@ drill.exec: {
write: true
}
},
+ profiles.store: {
+ inmemory: false,
+ capacity: 1000
+ },
impersonation: {
enabled: false,
max_chained_user_hops: 3