You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/01/09 22:06:29 UTC
drill git commit: DRILL-1947: Cache PStore/EStore instances rather
than recreating on each need. As part of this,
make sure that PStoreConfig doesn't use identity equality.
Repository: drill
Updated Branches:
refs/heads/master 1552c96f4 -> 7638dbb82
DRILL-1947: Cache PStore/EStore instances rather than recreating on each need. As part of this, make sure that PStoreConfig doesn't use identity equality.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7638dbb8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7638dbb8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7638dbb8
Branch: refs/heads/master
Commit: 7638dbb82606c9644d6ad02210fbfd5d8f6ae090
Parents: 1552c96
Author: Jacques Nadeau <ja...@apache.org>
Authored: Tue Jan 6 21:52:40 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Jan 9 08:15:53 2015 -0800
----------------------------------------------------------------------
.../exec/store/hbase/config/HBasePStore.java | 4 ++
.../exec/store/mongo/config/MongoPStore.java | 4 ++
.../org/apache/drill/exec/server/Drillbit.java | 9 ++-
.../exec/store/sys/CachingStoreProvider.java | 70 ++++++++++++++++++++
.../drill/exec/store/sys/EStoreProvider.java | 4 +-
.../org/apache/drill/exec/store/sys/PStore.java | 2 +
.../drill/exec/store/sys/PStoreConfig.java | 47 +++++++++++++
.../drill/exec/store/sys/PStoreProvider.java | 4 +-
.../drill/exec/store/sys/PStoreRegistry.java | 2 +-
.../drill/exec/store/sys/local/FilePStore.java | 4 ++
.../store/sys/local/LocalEStoreProvider.java | 23 +++----
.../drill/exec/store/sys/local/MapEStore.java | 6 +-
.../store/sys/local/NoWriteLocalPStore.java | 4 ++
.../store/sys/serialize/JacksonSerializer.java | 40 +++++++++++
.../store/sys/serialize/ProtoSerializer.java | 38 +++++++++++
.../exec/store/sys/zk/ZkAbstractStore.java | 17 ++++-
.../exec/store/sys/zk/ZkEStoreProvider.java | 8 +++
.../exec/store/sys/zk/ZkPStoreProvider.java | 14 ++--
18 files changed, 265 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
index 594b9f8..17ddcb1 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
@@ -229,4 +229,8 @@ public class HBasePStore<V> implements PStore<V> {
}
+ @Override
+ public void close() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
index fc5c05b..ea3a5a2 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java
@@ -177,4 +177,8 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants {
throw new UnsupportedOperationException();
}
}
+
+ @Override
+ public void close() {
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 4b9b20d..67342c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.rest.DrillRestServer;
import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.store.sys.CachingStoreProvider;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.store.sys.PStoreRegistry;
import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
@@ -106,7 +107,7 @@ public class Drillbit implements Closeable{
if(serviceSet != null) {
this.coord = serviceSet.getCoordinator();
- this.storeProvider = new LocalPStoreProvider(config);
+ this.storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
} else {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
this.coord = new ZKClusterCoordinator(config);
@@ -175,7 +176,11 @@ public class Drillbit implements Closeable{
logger.warn("Failure while shutting down embedded jetty server.");
}
Closeables.closeQuietly(engine);
- Closeables.closeQuietly(storeProvider);
+ try{
+ storeProvider.close();
+ }catch(Exception e){
+ logger.warn("Failure while closing store provider.", e);
+ }
Closeables.closeQuietly(coord);
Closeables.closeQuietly(manager);
Closeables.closeQuietly(context);
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
new file mode 100644
index 0000000..68440cb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class CachingStoreProvider implements PStoreProvider, AutoCloseable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class);
+
+ private final ConcurrentMap<PStoreConfig<?>, PStore<?>> storeCache = Maps.newConcurrentMap();
+ private final PStoreProvider provider;
+
+ public CachingStoreProvider(PStoreProvider provider) {
+ super();
+ this.provider = provider;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
+ PStore<?> s = storeCache.get(config);
+ if(s == null){
+ PStore<?> newStore = provider.getStore(config);
+ s = storeCache.putIfAbsent(config, newStore);
+ if(s == null){
+ s = newStore;
+ }else{
+ newStore.close();
+ }
+ }
+
+ return (PStore<V>) s;
+
+ }
+
+ @Override
+ public void start() throws IOException {
+ provider.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ for(PStore<?> store : storeCache.values()){
+ store.close();
+ }
+ storeCache.clear();
+ provider.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
index b09c5b4..4c79a28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
@@ -18,12 +18,10 @@
package org.apache.drill.exec.store.sys;
-import java.io.IOException;
/**
* Interface to define the provider which return EStore.
*/
-public interface EStoreProvider {
- public <V> PStore<V> getStore(PStoreConfig<V> table) throws IOException;
+public interface EStoreProvider extends PStoreProvider {
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
index 26c00ea..b629645 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.sys;
import java.util.Map;
+
/**
* Interface for reading and writing values to a persistent storage provider. Iterators are guaranteed to be returned in key order.
* @param <V>
@@ -28,4 +29,5 @@ public interface PStore<V> extends Iterable<Map.Entry<String, V>> {
public void put(String key, V value);
public boolean putIfAbsent(String key, V value);
public void delete(String key);
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
index 83c2243..bd9d977 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
@@ -116,4 +116,51 @@ public class PStoreConfig<V> {
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + maxIteratorSize;
+ result = prime * result + ((mode == null) ? 0 : mode.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((valueSerializer == null) ? 0 : valueSerializer.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ PStoreConfig other = (PStoreConfig) obj;
+ if (maxIteratorSize != other.maxIteratorSize) {
+ return false;
+ }
+ if (mode != other.mode) {
+ return false;
+ }
+ if (name == null) {
+ if (other.name != null) {
+ return false;
+ }
+ } else if (!name.equals(other.name)) {
+ return false;
+ }
+ if (valueSerializer == null) {
+ if (other.valueSerializer != null) {
+ return false;
+ }
+ } else if (!valueSerializer.equals(other.valueSerializer)) {
+ return false;
+ }
+ return true;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
index 6371dfa..efa223e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
@@ -17,11 +17,9 @@
*/
package org.apache.drill.exec.store.sys;
-import java.io.Closeable;
import java.io.IOException;
-public interface PStoreProvider extends AutoCloseable, Closeable{
-
+public interface PStoreProvider extends AutoCloseable {
public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException;
public void start() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
index 580d20a..532e6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
@@ -53,7 +53,7 @@ public class PStoreRegistry {
logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName);
Class<? extends PStoreProvider> storeProviderClass = (Class<? extends PStoreProvider>) Class.forName(storeProviderClassName);
Constructor<? extends PStoreProvider> c = storeProviderClass.getConstructor(PStoreRegistry.class);
- return c.newInstance(this);
+ return new CachingStoreProvider(c.newInstance(this));
} catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException
| InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
logger.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
index 416a21a..40f25e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -228,4 +228,8 @@ public class FilePStore<V> implements PStore<V> {
}
}
+ @Override
+ public void close() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
index 094d093..e7c2f94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
@@ -19,7 +19,6 @@
package org.apache.drill.exec.store.sys.local;
import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.EStoreProvider;
@@ -27,24 +26,22 @@ import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
public class LocalEStoreProvider implements EStoreProvider{
- private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = Maps.newConcurrentMap();
@Override
public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral.");
- if (! (estores.containsKey(storeConfig)) ) {
- EStore<V> p = new MapEStore<V>();
- EStore<?> p2 = estores.putIfAbsent(storeConfig, p);
- if(p2 != null) {
- return (EStore<V>) p2;
- }
- return p;
- } else {
- return (EStore<V>) estores.get(storeConfig);
- }
+ return new MapEStore<V>();
}
+
+ @Override
+ public void start() throws IOException {
+ }
+
+ @Override
+ public void close() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
index 2723916..96e51e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
* Implementation of EStore using ConcurrentHashMap.
* @param <V>
*/
-public class MapEStore <V> implements EStore<V> {
+public class MapEStore<V> implements EStore<V> {
ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
@Override
@@ -57,4 +57,8 @@ public class MapEStore <V> implements EStore<V> {
V out = store.putIfAbsent(key, value);
return out == null;
}
+
+ @Override
+ public void close() {
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
index 71a41f0..c675618 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
@@ -62,4 +62,8 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
blobMap.remove(key);
}
+ @Override
+ public void close() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
index b8a5cdd..53452f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
@@ -43,4 +43,44 @@ public class JacksonSerializer<X> implements PClassSerializer<X> {
public X deserialize(byte[] bytes) throws IOException {
return reader.readValue(bytes);
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((reader == null) ? 0 : reader.hashCode());
+ result = prime * result + ((writer == null) ? 0 : writer.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ JacksonSerializer other = (JacksonSerializer) obj;
+ if (reader == null) {
+ if (other.reader != null) {
+ return false;
+ }
+ } else if (!reader.equals(other.reader)) {
+ return false;
+ }
+ if (writer == null) {
+ if (other.writer != null) {
+ return false;
+ }
+ } else if (!writer.equals(other.writer)) {
+ return false;
+ }
+ return true;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
index 1ea714e..52df7a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
@@ -51,5 +51,43 @@ public class ProtoSerializer<X, B extends Message.Builder> implements PClassSeri
return (X) b.build();
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((readSchema == null) ? 0 : readSchema.hashCode());
+ result = prime * result + ((writeSchema == null) ? 0 : writeSchema.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ProtoSerializer other = (ProtoSerializer) obj;
+ if (readSchema == null) {
+ if (other.readSchema != null) {
+ return false;
+ }
+ } else if (!readSchema.equals(other.readSchema)) {
+ return false;
+ }
+ if (writeSchema == null) {
+ if (other.writeSchema != null) {
+ return false;
+ }
+ } else if (!writeSchema.equals(other.writeSchema)) {
+ return false;
+ }
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
index d61f3b4..01059a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
@@ -27,9 +27,8 @@ import java.util.Map.Entry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.zookeeper.CreateMode;
@@ -40,7 +39,8 @@ import com.google.common.collect.Lists;
* This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store)
* @param <V>
*/
-public abstract class ZkAbstractStore<V> {
+public abstract class ZkAbstractStore<V> implements AutoCloseable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class);
protected CuratorFramework framework;
protected PStoreConfig<V> config;
@@ -206,4 +206,15 @@ public abstract class ZkAbstractStore<V> {
}
}
+
+ @Override
+ public void close() {
+ try{
+ childrenCache.close();
+ }catch(IOException e){
+ logger.warn("Failure while closing out abstract store.", e);
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
index 1c2c3fd..7d7d475 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
@@ -40,4 +40,12 @@ public class ZkEStoreProvider implements EStoreProvider{
Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL);
return new ZkEStore<V>(curator,store);
}
+
+ @Override
+ public void start() throws IOException {
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index 03d2441..f8fa2bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.sys.zk;
-import java.io.File;
import java.io.IOException;
import org.apache.curator.framework.CuratorFramework;
@@ -26,7 +25,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
-import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.EStoreProvider;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -45,7 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider {
private final DrillFileSystem fs;
private final Path blobRoot;
- private final ZkEStoreProvider zkEStoreProvider;
+ private final EStoreProvider zkEStoreProvider;
public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
ClusterCoordinator coord = registry.getClusterCoordinator();
@@ -66,7 +65,6 @@ public class ZkPStoreProvider implements PStoreProvider {
throw new DrillbitStartupException("Failure while attempting to set up blob store.", e);
}
-
this.zkEStoreProvider = new ZkEStoreProvider(curator);
}
@@ -79,11 +77,6 @@ public class ZkPStoreProvider implements PStoreProvider {
}
@Override
- public void close() {
- }
-
-
- @Override
public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
switch(config.getMode()){
case BLOB_PERSISTENT:
@@ -101,4 +94,7 @@ public class ZkPStoreProvider implements PStoreProvider {
public void start() {
}
+ @Override
+ public void close() {
+ }
}