You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/16 23:31:39 UTC
[30/32] git commit: Switch Infinispan to SYNC mode and add cache
future handling.
Switch Infinispan to SYNC mode and add cache future handling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1568fc63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1568fc63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1568fc63
Branch: refs/heads/master
Commit: 1568fc63e4fa2e6c12cc38012334357fc0439da2
Parents: 6b9a1c5
Author: Steven Phillips <sp...@maprtech.com>
Authored: Mon Jun 9 02:35:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 16 13:50:45 2014 -0700
----------------------------------------------------------------------
.../apache/drill/exec/cache/DistributedMap.java | 9 +--
.../drill/exec/cache/DistributedMultiMap.java | 4 +-
.../drill/exec/cache/infinispan/ICache.java | 62 ++++++++++++++++----
.../drill/exec/cache/local/LocalCache.java | 53 ++++++++++++++---
.../apache/drill/exec/work/foreman/Foreman.java | 24 ++++++--
5 files changed, 123 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
index 2411434..7d3ca9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -18,15 +18,16 @@
package org.apache.drill.exec.cache;
import java.util.Map;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public interface DistributedMap<K, V>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
public V get(K key);
- public void put(K key, V value);
- public void delete(K key);
- public void putIfAbsent(K key, V value);
- public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit);
+ public Future<V> put(K key, V value);
+ public Future<V> delete(K key);
+ public Future<V> putIfAbsent(K key, V value);
+ public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit);
public Iterable<Map.Entry<K, V>> getLocalEntries();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
index bf06646..214a871 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
@@ -18,9 +18,11 @@
package org.apache.drill.exec.cache;
import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
public interface DistributedMultiMap<K, V> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class);
public Collection<V> get(K key);
- public void put(K key, V value);
+ public Future<Boolean> put(K key, V value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
index 651fc04..d3b63db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Maps;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.Counter;
@@ -54,7 +55,6 @@ import org.jgroups.protocols.COUNTER;
import org.jgroups.protocols.FRAG2;
import org.jgroups.stack.ProtocolStack;
-import com.google.common.collect.Maps;
public class ICache implements DistributedCache{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class);
@@ -69,7 +69,7 @@ public class ICache implements DistributedCache{
String clusterName = config.getString(ExecConstants.SERVICE_NAME);
this.local = local;
- final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_ASYNC;
+ final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_SYNC;
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
if(!local){
@@ -209,23 +209,23 @@ public class ICache implements DistributedCache{
}
@Override
- public void delete(K key) {
- cache.remove(key);
+ public Future<V> delete(K key) {
+ return cache.removeAsync(key);
}
@Override
- public void put(K key, V value) {
- cache.put(key, value);
+ public Future<V> put(K key, V value) {
+ return cache.putAsync(key, value);
}
@Override
- public void putIfAbsent(K key, V value) {
- cache.putIfAbsent(key, value);
+ public Future<V> putIfAbsent(K key, V value) {
+ return cache.putIfAbsentAsync(key, value);
}
@Override
- public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
- cache.putIfAbsent(key, value, ttl, timeUnit);
+ public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
+ return cache.putIfAbsentAsync(key, value, ttl, timeUnit);
}
}
@@ -247,16 +247,52 @@ public class ICache implements DistributedCache{
}
@Override
- public void put(K key, V value) {
- cache.put(key, new DeltaList<V>(value));
+ public Future<Boolean> put(K key, V value) {
+ return new ICacheFuture(cache.putAsync(key, new DeltaList(value)));
+ }
+
+ }
+
+ public static class ICacheFuture implements Future<Boolean> {
+
+ Future future;
+
+ public ICacheFuture(Future future) {
+ this.future = future;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return future.cancel(mayInterruptIfRunning);
}
+ @Override
+ public boolean isCancelled() {
+ return future.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return future.isDone();
+ }
+
+ @Override
+ public Boolean get() throws InterruptedException, ExecutionException {
+ future.get();
+ return true;
+ }
+
+ @Override
+ public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ future.get(timeout, unit);
+ return true;
+ }
}
- private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta{
+ private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta, List<V> {
/** The serialVersionUID */
private static final long serialVersionUID = 2176345973026460708L;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
index e61cd76..31ab909 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
@@ -29,8 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.common.config.DrillConfig;
@@ -235,8 +234,43 @@ public class LocalCache implements DistributedCache {
}
@Override
- public void put(K key, V value) {
+ public Future<Boolean> put(K key, V value) {
mmap.put(serialize(key, config.getMode()), serialize(value, config.getMode()));
+ return new LocalCacheFuture(true);
+ }
+ }
+
+ public static class LocalCacheFuture<V> implements Future<V> {
+
+ V value;
+
+ public LocalCacheFuture(V value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ return value;
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return value;
}
}
@@ -268,25 +302,30 @@ public class LocalCache implements DistributedCache {
}
@Override
- public void put(K key, V value) {
+ public Future<V> put(K key, V value) {
m.put(serialize(key, config.getMode()), serialize(value, config.getMode()));
+ return new LocalCacheFuture(value);
}
@Override
- public void putIfAbsent(K key, V value) {
+ public Future<V> putIfAbsent(K key, V value) {
m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode()));
+ return new LocalCacheFuture(value);
}
@Override
- public void delete(K key) {
+ public Future<V> delete(K key) {
+ V value = get(key);
m.remove(serialize(key, config.getMode()));
+ return new LocalCacheFuture(value);
}
@Override
- public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
+ public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode()));
logger.warn("Expiration not implemented in local map cache");
+ return new LocalCacheFuture<V>(value);
}
private class DeserializingTransformer implements Iterator<Map.Entry<K, V>> {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 3048f77..1629f8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -21,13 +21,20 @@ import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.CachedVectorContainer;
import org.apache.drill.exec.cache.DistributedCache.CacheConfig;
import org.apache.drill.exec.cache.DistributedCache.SerializationMode;
import org.apache.drill.exec.coord.DistributedSemaphore;
@@ -347,10 +354,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
// store fragments in distributed grid.
logger.debug("Storing fragments");
+ List<Future<PlanFragment>> queue = new LinkedList<>();
for (PlanFragment f : work.getFragments()) {
// store all fragments in grid since they are part of handshake.
- context.getCache().getMap(FRAGMENT_CACHE).put(f.getHandle(), f);
+ queue.add(context.getCache().getMap(FRAGMENT_CACHE).put(f.getHandle(), f));
if (f.getLeafFragment()) {
leafFragments.add(f);
} else {
@@ -358,9 +366,14 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
- int totalFragments = 1 + intermediateFragments.size() + leafFragments.size();
- fragmentManager.getStatus().setTotalFragments(totalFragments);
- fragmentManager.getStatus().updateCache();
+ for (Future<PlanFragment> f : queue) {
+ try {
+ f.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new ExecutionSetupException("failure while storing plan fragments", e);
+ }
+ }
+
logger.debug("Fragments stored.");
logger.debug("Submitting fragments to run.");
@@ -368,6 +381,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
logger.debug("Fragments running.");
state.updateState(QueryState.PENDING, QueryState.RUNNING);
+ int totalFragments = 1 + intermediateFragments.size() + leafFragments.size();
+ fragmentManager.getStatus().setTotalFragments(totalFragments);
+ fragmentManager.getStatus().updateCache();
} catch (Exception e) {
fail("Failure while setting up query.", e);