You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/05/27 06:02:00 UTC

tajo git commit: TAJO-2162: Internal error is occured while running query including 'order by'.

Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.4 7a4d55384 -> 69119f659


TAJO-2162: Internal error is occured while running query including 'order by'.

Closes #1031


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/69119f65
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/69119f65
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/69119f65

Branch: refs/heads/branch-0.11.4
Commit: 69119f659bb83896c0114e1fe2689cc0079d61f7
Parents: 7a4d553
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 27 15:01:04 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 27 15:01:04 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  9 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |  2 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   |  2 +-
 .../tajo/pullserver/TajoPullServerService.java  | 90 ++++++++++++++++----
 4 files changed, 84 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7bb653d..14d0573 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,13 @@
 Tajo Change Log 
 
-Release 0.11.3 - unreleased
+Release 0.11.4 - unreleased
+
+  BUG FIXES
+
+    TAJO-2162: Internal error is occured while running query including 'order by'. 
+    (jihoon)
+
+Release 0.11.3 - released
 
   NEW FEATURES
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 0e597a5..605fdac 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -207,7 +207,7 @@ public class TajoConf extends Configuration {
     // Shuffle Configuration --------------------------------------------------
     PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
     PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")),
-    PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
+    PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-sec", 5 * 60, Validators.min("0")),
     PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
         Validators.min("1")),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),

http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 5e1798e..480ffdf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -795,7 +795,7 @@ public class TaskImpl implements Task {
             executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
 
         try {
-          FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last);
+          FileChunk chunk = TajoPullServerService.getFileChunks(conf, queryId, sid, path, startKey, endKey, last);
           chunkList.add(chunk);
         } catch (Throwable t) {
           LOG.error(t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 8bbfd18..6330bfa 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -59,6 +59,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NettyUtils;
 import org.apache.tajo.storage.*;
@@ -73,9 +74,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public class TajoPullServerService extends AbstractService {
@@ -115,6 +114,7 @@ public class TajoPullServerService extends AbstractService {
 
   private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
   private static int lowCacheHitCheckThreshold;
+  private static ScheduledExecutorService scheduledService;
 
   public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
     "tajo.pullserver.ssl.file.buffer.size";
@@ -274,10 +274,11 @@ public class TajoPullServerService extends AbstractService {
 
     int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
     int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
+    enforceBufferBeforeClose = cacheTimeout == 0;
 
     indexReaderCache = CacheBuilder.newBuilder()
         .maximumSize(cacheSize)
-        .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
+        .expireAfterAccess(cacheTimeout, TimeUnit.SECONDS)
         .removalListener(removalListener)
         .build(
             new CacheLoader<CacheKey, BSTIndexReader>() {
@@ -288,6 +289,9 @@ public class TajoPullServerService extends AbstractService {
             }
         );
     lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
+    scheduledService = Executors.newSingleThreadScheduledExecutor();
+    final CloseBufferCleaner closer = new CloseBufferCleaner();
+    scheduledService.scheduleAtFixedRate(closer, 100, 1000, TimeUnit.MILLISECONDS);
 
     if (STANDALONE) {
       File pullServerPortFile = getPullServerPortFile();
@@ -368,6 +372,7 @@ public class TajoPullServerService extends AbstractService {
 
       localFS.close();
       indexReaderCache.invalidateAll();
+      scheduledService.shutdownNow();
     } catch (Throwable t) {
       LOG.error(t, t);
     } finally {
@@ -575,7 +580,7 @@ public class TajoPullServerService extends AbstractService {
 
           FileChunk chunk;
           try {
-            chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
+            chunk = getFileChunks(conf, queryId, sid, path, startKey, endKey, last);
           } catch (Throwable t) {
             LOG.error("ERROR Request: " + request.getUri(), t);
             sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
@@ -702,8 +707,8 @@ public class TajoPullServerService extends AbstractService {
         indexReaderCache.invalidateAll(removed);
       }
       removed.clear();
-      synchronized (waitForRemove) {
-        for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+      synchronized (readerCloseBuffer) {
+        for (Entry<CacheKey, BSTIndexReader> e : readerCloseBuffer.entrySet()) {
           CacheKey key = e.getKey();
           if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
             e.getValue().forceClose();
@@ -711,7 +716,7 @@ public class TajoPullServerService extends AbstractService {
           }
         }
         for (CacheKey eachKey : removed) {
-          waitForRemove.remove(eachKey);
+          readerCloseBuffer.remove(eachKey);
         }
       }
     }
@@ -778,37 +783,90 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
+  private static class CloseBufferCleaner implements Runnable {
+
+    @Override
+    public void run() {
+      List<CacheKey> removeKeys = new ArrayList<>();
+      Set<Entry<CacheKey, BSTIndexReader>> entrySet;
+      entrySet = readerCloseBuffer.entrySet();
+
+      for (Entry<CacheKey, BSTIndexReader> entry : entrySet) {
+        if (entry.getValue().getReferenceNum() == 0) {
+          try {
+            entry.getValue().close();
+            removeKeys.add(entry.getKey());
+          } catch (IOException e) {
+            throw new TajoInternalError(e);
+          }
+        }
+      }
+
+      synchronized (readerCloseBuffer) {
+        for (CacheKey key : removeKeys) {
+          readerCloseBuffer.remove(key);
+        }
+      }
+    }
+  }
+
+  // If this is set to true, index readers are directly moved to readerCloseBuffer
+  // no matter what its reference number is.
+  // This is used for test with zero cache timeout. (See TajoConf.PULLSERVER_CACHE_TIMEOUT)
+  private static boolean enforceBufferBeforeClose = false;
+
   // Temporal space to wait for the completion of all index lookup operations
-  private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<CacheKey, BSTIndexReader> readerCloseBuffer = new ConcurrentHashMap<>();
 
   // RemovalListener is triggered when an item is removed from the index reader cache.
   // It closes index readers when they are not used anymore.
-  // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
-  private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = new RemovalListener<CacheKey, BSTIndexReader>() {
+  // If they are still being used, they are moved to readerCloseBuffer map to wait for other operations' completion.
+  private static final RemovalListener<CacheKey, BSTIndexReader> removalListener =
+      new RemovalListener<CacheKey, BSTIndexReader>() {
     @Override
     public void onRemoval(RemovalNotification<CacheKey, BSTIndexReader> removal) {
       BSTIndexReader reader = removal.getValue();
-      if (reader.getReferenceNum() == 0) {
+      if (!enforceBufferBeforeClose && reader.getReferenceNum() == 0) {
         try {
           reader.close(); // tear down properly
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
-        waitForRemove.remove(removal.getKey());
       } else {
-        waitForRemove.put(removal.getKey(), reader);
+        // block changing the buffer while closing index readers in the buffer
+        synchronized (indexReaderCache) {
+          readerCloseBuffer.put(removal.getKey(), reader);
+        }
       }
     }
   };
 
-  public static FileChunk getFileChunks(String queryId,
+  public static FileChunk getFileChunks(TajoConf conf,
+                                        String queryId,
                                         String ebSeqId,
                                         Path outDir,
                                         String startKey,
                                         String endKey,
                                         boolean last) throws IOException, ExecutionException {
 
-    BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
+    BSTIndexReader idxReader;
+
+    if (indexReaderCache != null) {
+      CacheKey key = new CacheKey(outDir, queryId, ebSeqId);
+      synchronized (indexReaderCache) {
+        idxReader = readerCloseBuffer.remove(key);
+
+        if (idxReader != null) {
+          // Adding an element to the cache also can incur a removal of an element from the cache.
+          // So, the below line also should be synchronized.
+          indexReaderCache.put(key, idxReader);
+        } else {
+          idxReader = indexReaderCache.get(key);
+        }
+      }
+    } else {
+      idxReader = new BSTIndex(conf).getIndexReader(new Path(outDir, "index"));
+    }
     idxReader.retain();
 
     File data;