You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/11/18 11:15:01 UTC

[GitHub] [hive] dengzhhu653 commented on a change in pull request #2693: HIVE-25582: Empty result when using offset limit with MR

dengzhhu653 commented on a change in pull request #2693:
URL: https://github.com/apache/hive/pull/2693#discussion_r752141014



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
##########
@@ -18,82 +18,66 @@
 
 package org.apache.hadoop.hive.ql.exec.mr;
 
+import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 /**
- * ObjectCache. No-op implementation on MR we don't have a means to reuse
- * Objects between runs of the same task.
+ * ObjectCache. Simple implementation on MR we don't have a means to reuse
+ * Objects between runs of the same task, this acts as a local cache.
  *
  */
 public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
 
   private static final Logger LOG = LoggerFactory.getLogger(ObjectCache.class.getName());
 
+  private final Map<String, Object> cache = new ConcurrentHashMap<>();
+
+  private static ExecutorService staticPool = Executors.newCachedThreadPool();
+
   @Override
   public void release(String key) {
     // nothing to do
     LOG.debug("{} no longer needed", key);
+    cache.remove(key);
   }
 
   @Override
   public <T> T retrieve(String key) throws HiveException {
-    return retrieve(key, null);
+    return (T) cache.get(key);
   }
 
   @Override
   public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+    T value = (T) cache.get(key);
+    if (value != null || fn == null) {
+      return value;
+    }
     try {
       LOG.debug("Creating {}", key);
-      return fn.call();
+      value = fn.call();
     } catch (Exception e) {
       throw new HiveException(e);
     }
+    T previous = (T) cache.putIfAbsent(key, value);
+    return previous != null ? previous : value;
   }
 
   @Override
   public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException {
-    final T value = retrieve(key, fn);
-
-    return new Future<T>() {
-
-      @Override
-      public boolean cancel(boolean mayInterruptIfRunning) {
-        return false;
-      }
-
-      @Override
-      public boolean isCancelled() {
-        return false;
-      }
-
-      @Override
-      public boolean isDone() {
-        return true;
-      }
-
-      @Override
-      public T get() throws InterruptedException, ExecutionException {
-        return value;
-      }
-
-      @Override
-      public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
-          TimeoutException {
-        return value;
-      }
-    };
+    return staticPool.submit((Callable)() -> retrieve(key, fn));

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org