You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/16 23:31:39 UTC

[30/32] git commit: Switch Infinispan to SYNC mode and add cache future handling.

Switch Infinispan to SYNC mode and add cache future handling.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1568fc63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1568fc63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1568fc63

Branch: refs/heads/master
Commit: 1568fc63e4fa2e6c12cc38012334357fc0439da2
Parents: 6b9a1c5
Author: Steven Phillips <sp...@maprtech.com>
Authored: Mon Jun 9 02:35:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 16 13:50:45 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/cache/DistributedMap.java |  9 +--
 .../drill/exec/cache/DistributedMultiMap.java   |  4 +-
 .../drill/exec/cache/infinispan/ICache.java     | 62 ++++++++++++++++----
 .../drill/exec/cache/local/LocalCache.java      | 53 ++++++++++++++---
 .../apache/drill/exec/work/foreman/Foreman.java | 24 ++++++--
 5 files changed, 123 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
index 2411434..7d3ca9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -18,15 +18,16 @@
 package org.apache.drill.exec.cache;
 
 import java.util.Map;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 public interface DistributedMap<K, V>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
   public V get(K key);
-  public void put(K key, V value);
-  public void delete(K key);
-  public void putIfAbsent(K key, V value);
-  public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit);
+  public Future<V> put(K key, V value);
+  public Future<V> delete(K key);
+  public Future<V> putIfAbsent(K key, V value);
+  public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit);
   public Iterable<Map.Entry<K, V>> getLocalEntries();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
index bf06646..214a871 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
@@ -18,9 +18,11 @@
 package org.apache.drill.exec.cache;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
 
 public interface DistributedMultiMap<K, V> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class);
   public Collection<V> get(K key);
-  public void put(K key, V value);
+  public Future<Boolean> put(K key, V value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
index 651fc04..d3b63db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Maps;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.Counter;
@@ -54,7 +55,6 @@ import org.jgroups.protocols.COUNTER;
 import org.jgroups.protocols.FRAG2;
 import org.jgroups.stack.ProtocolStack;
 
-import com.google.common.collect.Maps;
 
 public class ICache implements DistributedCache{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class);
@@ -69,7 +69,7 @@ public class ICache implements DistributedCache{
     String clusterName = config.getString(ExecConstants.SERVICE_NAME);
     this.local = local;
 
-    final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_ASYNC;
+    final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_SYNC;
     GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
 
     if(!local){
@@ -209,23 +209,23 @@ public class ICache implements DistributedCache{
     }
 
     @Override
-    public void delete(K key) {
-      cache.remove(key);
+    public Future<V> delete(K key) {
+      return cache.removeAsync(key);
     }
 
     @Override
-    public void put(K key, V value) {
-      cache.put(key,  value);
+    public Future<V> put(K key, V value) {
+      return cache.putAsync(key, value);
     }
 
     @Override
-    public void putIfAbsent(K key, V value) {
-      cache.putIfAbsent(key,  value);
+    public Future<V> putIfAbsent(K key, V value) {
+      return cache.putIfAbsentAsync(key, value);
     }
 
     @Override
-    public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
-      cache.putIfAbsent(key, value, ttl, timeUnit);
+    public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
+      return cache.putIfAbsentAsync(key, value, ttl, timeUnit);
     }
 
   }
@@ -247,16 +247,52 @@ public class ICache implements DistributedCache{
     }
 
     @Override
-    public void put(K key, V value) {
-      cache.put(key, new DeltaList<V>(value));
+    public Future<Boolean> put(K key, V value) {
+      return new ICacheFuture(cache.putAsync(key, new DeltaList(value)));
+    }
+
+  }
+
+  public static class ICacheFuture implements Future<Boolean> {
+
+    Future future;
+
+    public ICacheFuture(Future future) {
+      this.future = future;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      return future.cancel(mayInterruptIfRunning);
     }
 
+    @Override
+    public boolean isCancelled() {
+      return future.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+      return future.isDone();
+    }
+
+    @Override
+    public Boolean get() throws InterruptedException, ExecutionException {
+      future.get();
+      return true;
+    }
+
+    @Override
+    public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      future.get(timeout, unit);
+      return true;
+    }
   }
 
 
 
 
-  private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta{
+  private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta, List<V> {
 
     /** The serialVersionUID */
     private static final long serialVersionUID = 2176345973026460708L;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
index e61cd76..31ab909 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
@@ -29,8 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -235,8 +234,43 @@ public class LocalCache implements DistributedCache {
     }
 
     @Override
-    public void put(K key, V value) {
+    public Future<Boolean> put(K key, V value) {
       mmap.put(serialize(key, config.getMode()), serialize(value, config.getMode()));
+      return new LocalCacheFuture(true);
+    }
+  }
+
+  public static class LocalCacheFuture<V> implements Future<V> {
+
+    V value;
+
+    public LocalCacheFuture(V value) {
+      this.value = value;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+
+    @Override
+    public V get() throws InterruptedException, ExecutionException {
+      return value;
+    }
+
+    @Override
+    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return value;
     }
   }
 
@@ -268,25 +302,30 @@ public class LocalCache implements DistributedCache {
     }
 
     @Override
-    public void put(K key, V value) {
+    public Future<V> put(K key, V value) {
       m.put(serialize(key, config.getMode()), serialize(value, config.getMode()));
+      return new LocalCacheFuture(value);
     }
 
 
     @Override
-    public void putIfAbsent(K key, V value) {
+    public Future<V> putIfAbsent(K key, V value) {
       m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode()));
+      return new LocalCacheFuture(value);
     }
 
     @Override
-    public void delete(K key) {
+    public Future<V> delete(K key) {
+      V value = get(key);
       m.remove(serialize(key, config.getMode()));
+      return new LocalCacheFuture(value);
     }
 
     @Override
-    public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
+    public Future<V> putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
       m.putIfAbsent(serialize(key, config.getMode()), serialize(value, config.getMode()));
       logger.warn("Expiration not implemented in local map cache");
+      return new LocalCacheFuture<V>(value);
     }
 
     private class DeserializingTransformer implements Iterator<Map.Entry<K, V>> {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1568fc63/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 3048f77..1629f8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -21,13 +21,20 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.CachedVectorContainer;
 import org.apache.drill.exec.cache.DistributedCache.CacheConfig;
 import org.apache.drill.exec.cache.DistributedCache.SerializationMode;
 import org.apache.drill.exec.coord.DistributedSemaphore;
@@ -347,10 +354,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
       // store fragments in distributed grid.
       logger.debug("Storing fragments");
+      List<Future<PlanFragment>> queue = new LinkedList<>();
       for (PlanFragment f : work.getFragments()) {
         // store all fragments in grid since they are part of handshake.
 
-        context.getCache().getMap(FRAGMENT_CACHE).put(f.getHandle(), f);
+        queue.add(context.getCache().getMap(FRAGMENT_CACHE).put(f.getHandle(), f));
         if (f.getLeafFragment()) {
           leafFragments.add(f);
         } else {
@@ -358,9 +366,14 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
         }
       }
 
-      int totalFragments = 1 + intermediateFragments.size() + leafFragments.size();
-      fragmentManager.getStatus().setTotalFragments(totalFragments);
-      fragmentManager.getStatus().updateCache();
+      for (Future<PlanFragment> f : queue) {
+        try {
+          f.get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+          throw new ExecutionSetupException("failure while storing plan fragments", e);
+        }
+      }
+
       logger.debug("Fragments stored.");
 
       logger.debug("Submitting fragments to run.");
@@ -368,6 +381,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
       logger.debug("Fragments running.");
       state.updateState(QueryState.PENDING, QueryState.RUNNING);
+      int totalFragments = 1 + intermediateFragments.size() + leafFragments.size();
+      fragmentManager.getStatus().setTotalFragments(totalFragments);
+      fragmentManager.getStatus().updateCache();
 
     } catch (Exception e) {
       fail("Failure while setting up query.", e);