You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/03/26 11:35:00 UTC
[05/13] drill git commit: DRILL-6053: Avoid excessive locking in
LocalPersistentStore
DRILL-6053: Avoid excessive locking in LocalPersistentStore
closes #1163
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/590a72bc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/590a72bc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/590a72bc
Branch: refs/heads/master
Commit: 590a72bc667f6bc373130bcae58c22c11f13edaf
Parents: 9327ca6
Author: Vlad Rozov <vr...@apache.org>
Authored: Tue Mar 13 10:56:52 2018 -0700
Committer: Vitalii Diravka <vi...@gmail.com>
Committed: Sat Mar 24 20:35:32 2018 +0200
----------------------------------------------------------------------
.../org/apache/drill/common/AutoCloseables.java | 5 +
.../common/concurrent/AutoCloseableLock.java | 6 +-
.../fn/registry/FunctionRegistryHolder.java | 24 +--
.../fn/registry/RemoteFunctionRegistry.java | 6 +-
.../exec/rpc/control/CustomHandlerRegistry.java | 5 +-
.../exec/store/sys/BasePersistentStore.java | 22 ---
.../drill/exec/store/sys/PersistentStore.java | 61 +-----
.../exec/store/sys/PersistentStoreProvider.java | 5 +-
.../org/apache/drill/exec/store/sys/Store.java | 58 ++++++
.../store/sys/VersionedPersistentStore.java | 57 ++++++
.../exec/store/sys/store/InMemoryStore.java | 85 ++-------
.../store/sys/store/LocalPersistentStore.java | 187 +++++++------------
.../sys/store/VersionedDelegatingStore.java | 120 ++++++++++++
.../sys/store/ZookeeperPersistentStore.java | 3 +-
.../ZookeeperPersistentStoreProvider.java | 20 ++
.../exec/testing/store/NoWriteLocalStore.java | 74 ++------
.../drill/exec/work/batch/IncomingBuffers.java | 4 +-
.../exec/store/sys/TestPStoreProviders.java | 6 +-
.../drill/exec/memory/AllocationManager.java | 11 +-
19 files changed, 401 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/common/src/main/java/org/apache/drill/common/AutoCloseables.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index fcdfe14..c12063c 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -25,6 +25,11 @@ import java.util.Collection;
*/
public class AutoCloseables {
+ public interface Closeable extends AutoCloseable {
+ @Override
+ void close();
+ }
+
public static AutoCloseable all(final Collection<? extends AutoCloseable> autoCloseables) {
return new AutoCloseable() {
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
index 91d50b4..3fe5c1e 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
@@ -19,10 +19,12 @@ package org.apache.drill.common.concurrent;
import java.util.concurrent.locks.Lock;
+import org.apache.drill.common.AutoCloseables.Closeable;
+
/**
* Simple wrapper class that allows Locks to be released via an try-with-resources block.
*/
-public class AutoCloseableLock implements AutoCloseable {
+public class AutoCloseableLock implements Closeable {
private final Lock lock;
@@ -30,7 +32,7 @@ public class AutoCloseableLock implements AutoCloseable {
this.lock = lock;
}
- public AutoCloseableLock open() {
+ public Closeable open() {
lock.lock();
return this;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index 3124539..1ab6e19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -22,6 +22,8 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
+
+import org.apache.drill.common.AutoCloseables.Closeable;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
@@ -104,7 +106,7 @@ public class FunctionRegistryHolder {
* @return local function registry version number
*/
public long getVersion() {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
return version;
}
}
@@ -121,7 +123,7 @@ public class FunctionRegistryHolder {
* @param newJars jars and list of their function holders, each contains function name, signature and holder
*/
public void addJars(Map<String, List<FunctionHolder>> newJars, long version) {
- try (AutoCloseableLock lock = writeLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) {
String jarName = newJar.getKey();
removeAllByJar(jarName);
@@ -141,7 +143,7 @@ public class FunctionRegistryHolder {
* @param jarName jar name to be removed
*/
public void removeJar(String jarName) {
- try (AutoCloseableLock lock = writeLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
removeAllByJar(jarName);
}
}
@@ -153,7 +155,7 @@ public class FunctionRegistryHolder {
* @return list of all jar names
*/
public List<String> getAllJarNames() {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
return Lists.newArrayList(jars.keySet());
}
}
@@ -167,7 +169,7 @@ public class FunctionRegistryHolder {
* @return list of functions names associated from the jar
*/
public List<String> getFunctionNamesByJar(String jarName) {
- try (AutoCloseableLock lock = readLock.open()){
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()){
Map<String, Queue<String>> functions = jars.get(jarName);
return functions == null ? Lists.<String>newArrayList() : Lists.newArrayList(functions.keySet());
}
@@ -184,7 +186,7 @@ public class FunctionRegistryHolder {
* @return all functions which their holders
*/
public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicLong version) {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
if (version != null) {
version.set(this.version);
}
@@ -215,7 +217,7 @@ public class FunctionRegistryHolder {
* @return all functions which their signatures
*/
public ListMultimap<String, String> getAllFunctionsWithSignatures() {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create();
for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet()));
@@ -235,7 +237,7 @@ public class FunctionRegistryHolder {
* @return list of function holders
*/
public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicLong version) {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
if (version != null) {
version.set(this.version);
}
@@ -263,7 +265,7 @@ public class FunctionRegistryHolder {
* @return true if jar exists, else false
*/
public boolean containsJar(String jarName) {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
return jars.containsKey(jarName);
}
}
@@ -275,7 +277,7 @@ public class FunctionRegistryHolder {
* @return quantity of functions
*/
public int functionsSize() {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
return functions.size();
}
}
@@ -291,7 +293,7 @@ public class FunctionRegistryHolder {
* @return jar name
*/
public String getJarNameByFunctionSignature(String functionName, String functionSignature) {
- try (AutoCloseableLock lock = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
for (Map.Entry<String, Map<String, Queue<String>>> jar : jars.entrySet()) {
Queue<String> functionSignatures = jar.getValue().get(functionName);
if (functionSignatures != null && functionSignatures.contains(functionSignature)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index 38d8fcc..df5e17f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -32,9 +32,9 @@ import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.proto.SchemaUserBitShared;
import org.apache.drill.exec.proto.UserBitShared.Registry;
-import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
@@ -92,7 +92,7 @@ public class RemoteFunctionRegistry implements AutoCloseable {
private Path stagingArea;
private Path tmpArea;
- private PersistentStore<Registry> registry;
+ private VersionedPersistentStore<Registry> registry;
private TransientStore<String> unregistration;
private TransientStore<String> jars;
@@ -192,7 +192,7 @@ public class RemoteFunctionRegistry implements AutoCloseable {
.name("udf")
.persist()
.build();
- registry = storeProvider.getOrCreateStore(registrationConfig);
+ registry = storeProvider.getOrCreateVersionedStore(registrationConfig);
registry.putIfAbsent(registry_path, Registry.getDefaultInstance());
} catch (StoreException e) {
throw new DrillRuntimeException("Failure while loading remote registry.", e);
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
index 7a2bd04..97e855c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
@@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.drill.common.AutoCloseables.Closeable;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -60,7 +61,7 @@ public class CustomHandlerRegistry {
Preconditions.checkNotNull(handler);
Preconditions.checkNotNull(requestSerde);
Preconditions.checkNotNull(responseSerde);
- try (AutoCloseableLock lock = write.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = write.open()) {
ParsingHandler<?, ?> parsingHandler = handlers.get(messageTypeId);
if (parsingHandler != null) {
throw new IllegalStateException(String.format(
@@ -76,7 +77,7 @@ public class CustomHandlerRegistry {
public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException {
final ParsingHandler<?, ?> handler;
- try (AutoCloseableLock lock = read.open()) {
+ try (@SuppressWarnings("unused") Closeable lock = read.open()) {
handler = handlers.get(message.getType());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
index 0640407..38309bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.store.sys;
-import org.apache.drill.exec.store.sys.store.DataChangeVersion;
-
import java.util.Iterator;
import java.util.Map;
@@ -29,24 +27,4 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> {
return getRange(0, Integer.MAX_VALUE);
}
- /** By default contains with version will behave the same way as without version.
- * Override this method to add version support. */
- public boolean contains(String key, DataChangeVersion version) {
- return contains(key);
- }
-
- /** By default get with version will behave the same way as without version.
- * Override this method to add version support. */
- @Override
- public V get(String key, DataChangeVersion version) {
- return get(key);
- }
-
- /** By default put with version will behave the same way as without version.
- * Override this method to add version support. */
- @Override
- public void put(String key, V value, DataChangeVersion version) {
- put(key, value);
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
index 206642a..02959aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.store.sys;
-import org.apache.drill.exec.store.sys.store.DataChangeVersion;
-
import java.util.Iterator;
import java.util.Map;
@@ -27,11 +25,7 @@ import java.util.Map;
*
* @param <V> value type
*/
-public interface PersistentStore<V> extends AutoCloseable {
- /**
- * Returns storage {@link PersistentStoreMode mode} of this store.
- */
- PersistentStoreMode getMode();
+public interface PersistentStore<V> extends Store<V> {
/**
* Checks if lookup key is present in store.
@@ -42,30 +36,12 @@ public interface PersistentStore<V> extends AutoCloseable {
boolean contains(String key);
/**
- * Checks if lookup key is present in store.
- * Sets data change version number.
- *
- * @param key lookup key
- * @param version version holder
- * @return true if store contains lookup key, false otherwise
- */
- boolean contains(String key, DataChangeVersion version);
-
- /**
* Returns the value for the given key if exists, null otherwise.
* @param key lookup key
*/
V get(String key);
/**
- * Returns the value for the given key if exists, null otherwise.
- * Sets data change version number.
- * @param key lookup key
- * @param version version holder
- */
- V get(String key, DataChangeVersion version);
-
- /**
* Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}.
*
* @param key lookup key
@@ -74,41 +50,6 @@ public interface PersistentStore<V> extends AutoCloseable {
void put(String key, V value);
/**
- * Stores the (key, value) tuple in the store.
- * If tuple already exits, stores it only if versions match,
- * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException}
- * Lifetime of the tuple depends upon store {@link #getMode mode}.
- *
- * @param key lookup key
- * @param value value to store
- * @param version version holder
- */
- void put(String key, V value, DataChangeVersion version);
-
- /**
- * Removes the value corresponding to the given key if exists, nothing happens otherwise.
- * @param key lookup key
- */
- void delete(String key);
-
- /**
- * Stores the (key, value) tuple in the store only if it does not exists.
- *
- * @param key lookup key
- * @param value value to store
- * @return true if put takes place, false otherwise.
- */
- boolean putIfAbsent(String key, V value);
-
- /**
- * Returns an iterator of desired number of entries offsetting by the skip value.
- *
- * @param skip number of records to skip from beginning
- * @param take max number of records to return
- */
- Iterator<Map.Entry<String, V>> getRange(int skip, int take);
-
- /**
* Returns an iterator of entries.
*/
Iterator<Map.Entry<String, V>> getAll();
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
index 75b89b4..c0f7030 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.sys;
import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
/**
* A factory used to create {@link PersistentStore store} instances.
@@ -33,7 +34,9 @@ public interface PersistentStoreProvider extends AutoCloseable {
* @param <V> store value type
*/
<V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException;
-
+ default <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws StoreException {
+ return new VersionedDelegatingStore<>(getOrCreateStore(config));
+ }
/**
* Sets up the provider.
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java
new file mode 100644
index 0000000..c2b1999
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A Store interface used to store and retrieve instances of given value type.
+ *
+ * @param <V> value type
+ */
+public interface Store<V> extends AutoCloseable {
+ /**
+ * Returns storage {@link PersistentStoreMode mode} of this store.
+ */
+ PersistentStoreMode getMode();
+
+ /**
+ * Removes the value corresponding to the given key if exists, nothing happens otherwise.
+ * @param key lookup key
+ */
+ void delete(String key);
+
+ /**
+ * Stores the (key, value) tuple in the store only if it does not exists.
+ *
+ * @param key lookup key
+ * @param value value to store
+ * @return true if put takes place, false otherwise.
+ */
+ boolean putIfAbsent(String key, V value);
+
+ /**
+ * Returns an iterator of desired number of entries offsetting by the skip value.
+ *
+ * @param skip number of records to skip from beginning
+ * @param take max number of records to return
+ */
+ Iterator<Map.Entry<String, V>> getRange(int skip, int take);
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java
new file mode 100644
index 0000000..24fa78e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+
+/**
+ * Extension to the Store interface that supports versions
+ * @param <V>
+ */
+public interface VersionedPersistentStore<V> extends Store<V> {
+ /**
+ * Checks if lookup key is present in store.
+ * Sets data change version number.
+ *
+ * @param key lookup key
+ * @param version version holder
+ * @return true if store contains lookup key, false otherwise
+ */
+ boolean contains(String key, DataChangeVersion version);
+
+ /**
+ * Returns the value for the given key if exists, null otherwise.
+ * Sets data change version number.
+ * @param key lookup key
+ * @param version version holder
+ */
+ V get(String key, DataChangeVersion version);
+
+ /**
+ * Stores the (key, value) tuple in the store.
+ * If tuple already exits, stores it only if versions match,
+ * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException}
+ * Lifetime of the tuple depends upon store {@link #getMode mode}.
+ *
+ * @param key lookup key
+ * @param value value to store
+ * @param version version holder
+ */
+ void put(String key, V value, DataChangeVersion version);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
index 10da92d..f63c4f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
@@ -19,15 +19,11 @@ package org.apache.drill.exec.store.sys.store;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.drill.common.concurrent.AutoCloseableLock;
-import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.store.sys.BasePersistentStore;
-import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreMode;
import com.google.common.collect.Iterables;
@@ -35,26 +31,19 @@ import com.google.common.collect.Iterables;
public class InMemoryStore<V> extends BasePersistentStore<V> {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryPersistentStore.class);
- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
- private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
- private final ConcurrentSkipListMap<String, V> store;
- private int version = -1;
+ private final ConcurrentNavigableMap<String, V> store;
private final int capacity;
private final AtomicInteger currentSize = new AtomicInteger();
public InMemoryStore(int capacity) {
this.capacity = capacity;
//Allows us to trim out the oldest elements to maintain finite max size
- this.store = new ConcurrentSkipListMap<String, V>();
+ this.store = new ConcurrentSkipListMap<>();
}
@Override
public void delete(final String key) {
- try (AutoCloseableLock lock = writeLock.open()) {
- store.remove(key);
- version++;
- }
+ store.remove(key);
}
@Override
@@ -64,80 +53,36 @@ public class InMemoryStore<V> extends BasePersistentStore<V> {
@Override
public boolean contains(final String key) {
- return contains(key, null);
- }
-
- @Override
- public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = readLock.open()) {
- if (dataChangeVersion != null) {
- dataChangeVersion.setVersion(version);
- }
- return store.containsKey(key);
- }
+ return store.containsKey(key);
}
@Override
public V get(final String key) {
- return get(key, null);
- }
-
- @Override
- public V get(final String key, final DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = readLock.open()) {
- if (dataChangeVersion != null) {
- dataChangeVersion.setVersion(version);
- }
- return store.get(key);
- }
+ return store.get(key);
}
@Override
public void put(final String key, final V value) {
- put(key, value, null);
- }
-
- @Override
- public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = writeLock.open()) {
- if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
- throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
- }
- store.put(key, value);
- if (currentSize.incrementAndGet() > capacity) {
- //Pop Out Oldest
- store.pollLastEntry();
- currentSize.decrementAndGet();
- }
-
- version++;
+ store.put(key, value);
+ if (currentSize.incrementAndGet() > capacity) {
+ //Pop Out Oldest
+ store.pollLastEntry();
+ currentSize.decrementAndGet();
}
}
@Override
public boolean putIfAbsent(final String key, final V value) {
- try (AutoCloseableLock lock = writeLock.open()) {
- final V old = store.putIfAbsent(key, value);
- if (old == null) {
- version++;
- return true;
- }
- return false;
- }
+ return (value != store.putIfAbsent(key, value));
}
@Override
public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
- try (AutoCloseableLock lock = readLock.open()) {
- return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
- }
+ return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
}
@Override
- public void close() throws Exception {
- try (AutoCloseableLock lock = writeLock.open()) {
- store.clear();
- version = -1;
- }
+ public void close() {
+ store.clear();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
index 313a9be..0905c0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
@@ -35,9 +35,7 @@ import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.drill.common.collections.ImmutableEntry;
-import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.store.sys.BasePersistentStore;
@@ -59,35 +57,34 @@ import org.slf4j.LoggerFactory;
public class LocalPersistentStore<V> extends BasePersistentStore<V> {
private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class);
- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
- private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
-
private final Path basePath;
private final PersistentStoreConfig<V> config;
private final DrillFileSystem fs;
- private int version = -1;
public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) {
- super();
this.basePath = new Path(base, config.getName());
this.config = config;
this.fs = fs;
-
try {
- if (!fs.mkdirs(basePath)) {
- version++;
- }
+ mkdirs(getBasePath());
} catch (IOException e) {
throw new RuntimeException("Failure setting pstore configuration path.");
}
}
+ protected Path getBasePath() {
+ return basePath;
+ }
+
@Override
public PersistentStoreMode getMode() {
return PersistentStoreMode.PERSISTENT;
}
+ private void mkdirs(Path path) throws IOException {
+ fs.mkdirs(path);
+ }
+
public static Path getLogDir() {
String drillLogDir = System.getenv("DRILL_LOG_DIR");
if (drillLogDir == null) {
@@ -114,39 +111,37 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
@Override
public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
- try (AutoCloseableLock lock = readLock.open()) {
- try {
- // list only files with sys file suffix
- PathFilter sysFileSuffixFilter = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX);
- }
- };
-
- List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter);
- if (fileStatuses.isEmpty()) {
- return Collections.emptyIterator();
- }
-
- List<String> files = Lists.newArrayList();
- for (FileStatus stat : fileStatuses) {
- String s = stat.getPath().getName();
- files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
+ try {
+ // list only files with sys file suffix
+ PathFilter sysFileSuffixFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX);
}
+ };
- Collections.sort(files);
+ List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter);
+ if (fileStatuses.isEmpty()) {
+ return Collections.emptyIterator();
+ }
- return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() {
- @Nullable
- @Override
- public Entry<String, V> apply(String key) {
- return new ImmutableEntry<>(key, get(key));
- }
- }).iterator();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ List<String> files = Lists.newArrayList();
+ for (FileStatus stat : fileStatuses) {
+ String s = stat.getPath().getName();
+ files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
}
+
+ Collections.sort(files);
+
+ return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() {
+ @Nullable
+ @Override
+ public Entry<String, V> apply(String key) {
+ return new ImmutableEntry<>(key, get(key));
+ }
+ }).iterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@@ -160,108 +155,68 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
@Override
public boolean contains(String key) {
- return contains(key, null);
- }
-
- @Override
- public boolean contains(String key, DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = readLock.open()) {
- try {
- Path path = makePath(key);
- boolean exists = fs.exists(path);
- if (exists && dataChangeVersion != null) {
- dataChangeVersion.setVersion(version);
- }
- return exists;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ try {
+ return fs.exists(makePath(key));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@Override
public V get(String key) {
- return get(key, null);
- }
-
- @Override
- public V get(String key, DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = readLock.open()) {
- try {
- if (dataChangeVersion != null) {
- dataChangeVersion.setVersion(version);
- }
- Path path = makePath(key);
- if (!fs.exists(path)) {
- return null;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- final Path path = makePath(key);
- try (InputStream is = fs.open(path)) {
- return config.getSerializer().deserialize(IOUtils.toByteArray(is));
- } catch (IOException e) {
- throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
+ try {
+ Path path = makePath(key);
+ if (!fs.exists(path)) {
+ return null;
}
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ final Path path = makePath(key);
+ try (InputStream is = fs.open(path)) {
+ return config.getSerializer().deserialize(IOUtils.toByteArray(is));
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
}
}
@Override
public void put(String key, V value) {
- put(key, value, null);
- }
-
- @Override
- public void put(String key, V value, DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = writeLock.open()) {
- if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
- throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
- }
- try (OutputStream os = fs.create(makePath(key))) {
- IOUtils.write(config.getSerializer().serialize(value), os);
- version++;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ try (OutputStream os = fs.create(makePath(key))) {
+ IOUtils.write(config.getSerializer().serialize(value), os);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@Override
public boolean putIfAbsent(String key, V value) {
- try (AutoCloseableLock lock = writeLock.open()) {
- try {
- Path p = makePath(key);
- if (fs.exists(p)) {
- return false;
- } else {
- try (OutputStream os = fs.create(makePath(key))) {
- IOUtils.write(config.getSerializer().serialize(value), os);
- version++;
- }
- return true;
+ try {
+ Path p = makePath(key);
+ if (fs.exists(p)) {
+ return false;
+ } else {
+ try (OutputStream os = fs.create(makePath(key))) {
+ IOUtils.write(config.getSerializer().serialize(value), os);
}
- } catch (IOException e) {
- throw new RuntimeException(e);
+ return true;
}
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@Override
public void delete(String key) {
- try (AutoCloseableLock lock = writeLock.open()) {
- try {
- fs.delete(makePath(key), false);
- version++;
- } catch (IOException e) {
- logger.error("Unable to delete data from storage.", e);
- throw new RuntimeException(e);
- }
+ try {
+ fs.delete(makePath(key), false);
+ } catch (IOException e) {
+ logger.error("Unable to delete data from storage.", e);
+ throw new RuntimeException(e);
}
}
@Override
public void close() {
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
new file mode 100644
index 0000000..23eedd9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.AutoCloseables.Closeable;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
+
+/**
+ * Versioned Store that delegates operations to PersistentStore
+ * @param <V>
+ */
+public class VersionedDelegatingStore<V> implements VersionedPersistentStore<V> {
+ private final PersistentStore<V> store;
+ private final ReadWriteLock readWriteLock;
+ private final AutoCloseableLock readLock;
+ private final AutoCloseableLock writeLock;
+ private int version;
+
+ public VersionedDelegatingStore(PersistentStore<V> store) {
+ this.store = store;
+ readWriteLock = new ReentrantReadWriteLock();
+ readLock = new AutoCloseableLock(readWriteLock.readLock());
+ writeLock = new AutoCloseableLock(readWriteLock.writeLock());
+ version = -1;
+ }
+
+ @Override
+ public PersistentStoreMode getMode() {
+ return store.getMode();
+ }
+
+ @Override
+ public void delete(final String key) {
+ try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+ store.delete(key);
+ version++;
+ }
+ }
+
+ @Override
+ public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
+ boolean contains = store.contains(key);
+ dataChangeVersion.setVersion(version);
+ return contains;
+ }
+ }
+
+ @Override
+ public V get(final String key, final DataChangeVersion dataChangeVersion) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
+ V value = store.get(key);
+ dataChangeVersion.setVersion(version);
+ return value;
+ }
+ }
+
+ @Override
+ public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
+ try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+ if (dataChangeVersion.getVersion() != version) {
+ throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
+ }
+ store.put(key, value);
+ version++;
+ }
+ }
+
+ @Override
+ public boolean putIfAbsent(String key, V value) {
+ try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+ if (store.putIfAbsent(key, value)) {
+ version++;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
+ try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
+ return store.getRange(skip, take);
+ }
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+ store.close();
+ version = -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
index a3ee58e..1f20212 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
@@ -36,12 +36,13 @@ import org.apache.drill.exec.serialization.InstanceSerializer;
import org.apache.drill.exec.store.sys.BasePersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.zookeeper.CreateMode;
/**
* Zookeeper based implementation of {@link org.apache.drill.exec.store.sys.PersistentStore}.
*/
-public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
+public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> implements VersionedPersistentStore<V> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperPersistentStore.class);
private final PersistentStoreConfig<V> config;
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
index a5502cb..a2e30f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
@@ -28,7 +28,9 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
import org.apache.hadoop.fs.Path;
@@ -81,6 +83,24 @@ public class ZookeeperPersistentStoreProvider extends BasePersistentStoreProvide
}
@Override
+ public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(final PersistentStoreConfig<V> config) throws StoreException {
+ switch(config.getMode()){
+ case BLOB_PERSISTENT:
+ return new VersionedDelegatingStore<>(new LocalPersistentStore<>(fs, blobRoot, config));
+ case PERSISTENT:
+ final ZookeeperPersistentStore<V> store = new ZookeeperPersistentStore<>(curator, config);
+ try {
+ store.start();
+ } catch (Exception e) {
+ throw new StoreException("unable to start zookeeper store", e);
+ }
+ return store;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
public void close() throws Exception {
fs.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
index e36dc83..528705a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
@@ -20,30 +20,19 @@ package org.apache.drill.exec.testing.store;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import org.apache.drill.common.concurrent.AutoCloseableLock;
-import org.apache.drill.exec.exception.VersionMismatchException;
+
import org.apache.drill.exec.store.sys.BasePersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreMode;
-import org.apache.drill.exec.store.sys.store.DataChangeVersion;
public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
- private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
private final ConcurrentMap<String, V> store = Maps.newConcurrentMap();
- private int version = -1;
@Override
public void delete(final String key) {
- try (AutoCloseableLock lock = writeLock.open()) {
- store.remove(key);
- version++;
- }
+ store.remove(key);
}
@Override
@@ -53,74 +42,35 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
@Override
public boolean contains(final String key) {
- return contains(key, null);
- }
-
- @Override
- public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = readLock.open()) {
- if (dataChangeVersion != null) {
- dataChangeVersion.setVersion(version);
- }
- return store.containsKey(key);
- }
+ return store.containsKey(key);
}
@Override
public V get(final String key) {
- return get(key, null);
- }
-
- @Override
- public V get(final String key, final DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = readLock.open()) {
- if (dataChangeVersion != null) {
- dataChangeVersion.setVersion(version);
- }
- return store.get(key);
- }
+ return store.get(key);
}
@Override
public void put(final String key, final V value) {
- put(key, value, null);
- }
-
- @Override
- public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
- try (AutoCloseableLock lock = writeLock.open()) {
- if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
- throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
- }
- store.put(key, value);
- version++;
- }
+ store.put(key, value);
}
@Override
public boolean putIfAbsent(final String key, final V value) {
- try (AutoCloseableLock lock = writeLock.open()) {
- final V old = store.putIfAbsent(key, value);
- if (old == null) {
- version++;
- return true;
- }
- return false;
+ final V old = store.putIfAbsent(key, value);
+ if (old == null) {
+ return true;
}
+ return false;
}
@Override
public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
- try (AutoCloseableLock lock = readLock.open()) {
- return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
- }
+ return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
}
@Override
- public void close() throws Exception {
- try (AutoCloseableLock lock = writeLock.open()) {
- store.clear();
- version = -1;
- }
+ public void close() {
+ store.clear();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index a516fad..5182093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -86,7 +86,7 @@ public class IncomingBuffers implements AutoCloseable {
// we want to make sure that we only generate local record batch reference in the case that we're not closed.
// Otherwise we would leak memory.
- try (AutoCloseableLock lock = sharedIncomingBatchLock.open()) {
+ try (@SuppressWarnings("unused") AutoCloseables.Closeable lock = sharedIncomingBatchLock.open()) {
if (closed) {
return false;
}
@@ -135,7 +135,7 @@ public class IncomingBuffers implements AutoCloseable {
@Override
public void close() throws Exception {
- try (AutoCloseableLock lock = exclusiveCloseLock.open()) {
+ try (@SuppressWarnings("unused") AutoCloseables.Closeable lock = exclusiveCloseLock.open()) {
closed = true;
AutoCloseables.close(collectorMap.values());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index 8896fb0..73ddfe0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.coord.zk.PathUtils;
import org.apache.drill.exec.coord.zk.ZookeeperClient;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.PersistedOptionValue;
+import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
import org.apache.drill.test.BaseDirTestWatcher;
@@ -45,6 +46,8 @@ import org.junit.experimental.categories.Category;
import java.io.File;
+import static org.junit.Assert.assertTrue;
+
@Category({SlowTest.class})
public class TestPStoreProviders extends TestWithZookeeper {
@Rule
@@ -133,8 +136,9 @@ public class TestPStoreProviders extends TestWithZookeeper {
try (ZookeeperPersistentStoreProvider provider =
new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator)) {
PersistentStore<PersistedOptionValue> store = provider.getOrCreateStore(storeConfig);
+ assertTrue(store instanceof ZookeeperPersistentStore);
- PersistedOptionValue oldOptionValue = store.get(oldName, null);
+ PersistedOptionValue oldOptionValue = ((ZookeeperPersistentStore<PersistedOptionValue>)store).get(oldName, null);
PersistedOptionValue expectedValue = new PersistedOptionValue("true");
Assert.assertEquals(expectedValue, oldOptionValue);
http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
index 3b5967f..e9f35cc 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.drill.common.AutoCloseables.Closeable;
import org.apache.drill.common.HistoricalLog;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
@@ -109,7 +110,7 @@ public class AllocationManager {
"A buffer can only be associated between two allocators that share the same root.");
}
- try (AutoCloseableLock read = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable read = readLock.open()) {
final BufferLedger ledger = map.get(allocator);
if (ledger != null) {
@@ -119,7 +120,7 @@ public class AllocationManager {
return ledger;
}
}
- try (AutoCloseableLock write = writeLock.open()) {
+ try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
// we have to recheck existing ledger since a second reader => writer could be competing with us.
final BufferLedger existingLedger = map.get(allocator);
@@ -242,7 +243,7 @@ public class AllocationManager {
// since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
// that this won't happen by synchronizing on the allocator manager instance.
- try (AutoCloseableLock write = writeLock.open()) {
+ try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
if (owningLedger != this) {
return true;
}
@@ -320,7 +321,7 @@ public class AllocationManager {
allocator.assertOpen();
final int outcome;
- try (AutoCloseableLock write = writeLock.open()) {
+ try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
@@ -424,7 +425,7 @@ public class AllocationManager {
* @return Amount of accounted(owned) memory associated with this ledger.
*/
public int getAccountedSize() {
- try (AutoCloseableLock read = readLock.open()) {
+ try (@SuppressWarnings("unused") Closeable read = readLock.open()) {
if (owningLedger == this) {
return size;
} else {