You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/05/27 06:02:00 UTC
tajo git commit: TAJO-2162: Internal error is occured while running
query including 'order by'.
Repository: tajo
Updated Branches:
refs/heads/branch-0.11.4 7a4d55384 -> 69119f659
TAJO-2162: Internal error is occured while running query including 'order by'.
Closes #1031
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/69119f65
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/69119f65
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/69119f65
Branch: refs/heads/branch-0.11.4
Commit: 69119f659bb83896c0114e1fe2689cc0079d61f7
Parents: 7a4d553
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 27 15:01:04 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 27 15:01:04 2016 +0900
----------------------------------------------------------------------
CHANGES | 9 +-
.../java/org/apache/tajo/conf/TajoConf.java | 2 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 2 +-
.../tajo/pullserver/TajoPullServerService.java | 90 ++++++++++++++++----
4 files changed, 84 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7bb653d..14d0573 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,13 @@
Tajo Change Log
-Release 0.11.3 - unreleased
+Release 0.11.4 - unreleased
+
+ BUG FIXES
+
+ TAJO-2162: Internal error is occured while running query including 'order by'.
+ (jihoon)
+
+Release 0.11.3 - released
NEW FEATURES
http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 0e597a5..605fdac 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -207,7 +207,7 @@ public class TajoConf extends Configuration {
// Shuffle Configuration --------------------------------------------------
PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")),
- PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
+ PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-sec", 5 * 60, Validators.min("0")),
PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
Validators.min("1")),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 5e1798e..480ffdf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -795,7 +795,7 @@ public class TaskImpl implements Task {
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
try {
- FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last);
+ FileChunk chunk = TajoPullServerService.getFileChunks(conf, queryId, sid, path, startKey, endKey, last);
chunkList.add(chunk);
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/69119f65/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 8bbfd18..6330bfa 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -59,6 +59,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NettyUtils;
import org.apache.tajo.storage.*;
@@ -73,9 +74,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class TajoPullServerService extends AbstractService {
@@ -115,6 +114,7 @@ public class TajoPullServerService extends AbstractService {
private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
private static int lowCacheHitCheckThreshold;
+ private static ScheduledExecutorService scheduledService;
public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
"tajo.pullserver.ssl.file.buffer.size";
@@ -274,10 +274,11 @@ public class TajoPullServerService extends AbstractService {
int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
+ enforceBufferBeforeClose = cacheTimeout == 0;
indexReaderCache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
- .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
+ .expireAfterAccess(cacheTimeout, TimeUnit.SECONDS)
.removalListener(removalListener)
.build(
new CacheLoader<CacheKey, BSTIndexReader>() {
@@ -288,6 +289,9 @@ public class TajoPullServerService extends AbstractService {
}
);
lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
+ scheduledService = Executors.newSingleThreadScheduledExecutor();
+ final CloseBufferCleaner closer = new CloseBufferCleaner();
+ scheduledService.scheduleAtFixedRate(closer, 100, 1000, TimeUnit.MILLISECONDS);
if (STANDALONE) {
File pullServerPortFile = getPullServerPortFile();
@@ -368,6 +372,7 @@ public class TajoPullServerService extends AbstractService {
localFS.close();
indexReaderCache.invalidateAll();
+ scheduledService.shutdownNow();
} catch (Throwable t) {
LOG.error(t, t);
} finally {
@@ -575,7 +580,7 @@ public class TajoPullServerService extends AbstractService {
FileChunk chunk;
try {
- chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
+ chunk = getFileChunks(conf, queryId, sid, path, startKey, endKey, last);
} catch (Throwable t) {
LOG.error("ERROR Request: " + request.getUri(), t);
sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
@@ -702,8 +707,8 @@ public class TajoPullServerService extends AbstractService {
indexReaderCache.invalidateAll(removed);
}
removed.clear();
- synchronized (waitForRemove) {
- for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+ synchronized (readerCloseBuffer) {
+ for (Entry<CacheKey, BSTIndexReader> e : readerCloseBuffer.entrySet()) {
CacheKey key = e.getKey();
if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
e.getValue().forceClose();
@@ -711,7 +716,7 @@ public class TajoPullServerService extends AbstractService {
}
}
for (CacheKey eachKey : removed) {
- waitForRemove.remove(eachKey);
+ readerCloseBuffer.remove(eachKey);
}
}
}
@@ -778,37 +783,90 @@ public class TajoPullServerService extends AbstractService {
}
}
+ private static class CloseBufferCleaner implements Runnable {
+
+ @Override
+ public void run() {
+ List<CacheKey> removeKeys = new ArrayList<>();
+ Set<Entry<CacheKey, BSTIndexReader>> entrySet;
+ entrySet = readerCloseBuffer.entrySet();
+
+ for (Entry<CacheKey, BSTIndexReader> entry : entrySet) {
+ if (entry.getValue().getReferenceNum() == 0) {
+ try {
+ entry.getValue().close();
+ removeKeys.add(entry.getKey());
+ } catch (IOException e) {
+ throw new TajoInternalError(e);
+ }
+ }
+ }
+
+ synchronized (readerCloseBuffer) {
+ for (CacheKey key : removeKeys) {
+ readerCloseBuffer.remove(key);
+ }
+ }
+ }
+ }
+
+ // If this is set to true, index readers are directly moved to readerCloseBuffer
+ // no matter what its reference number is.
+ // This is used for test with zero cache timeout. (See TajoConf.PULLSERVER_CACHE_TIMEOUT)
+ private static boolean enforceBufferBeforeClose = false;
+
// Temporal space to wait for the completion of all index lookup operations
- private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<CacheKey, BSTIndexReader> readerCloseBuffer = new ConcurrentHashMap<>();
// RemovalListener is triggered when an item is removed from the index reader cache.
// It closes index readers when they are not used anymore.
- // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
- private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = new RemovalListener<CacheKey, BSTIndexReader>() {
+ // If they are still being used, they are moved to readerCloseBuffer map to wait for other operations' completion.
+ private static final RemovalListener<CacheKey, BSTIndexReader> removalListener =
+ new RemovalListener<CacheKey, BSTIndexReader>() {
@Override
public void onRemoval(RemovalNotification<CacheKey, BSTIndexReader> removal) {
BSTIndexReader reader = removal.getValue();
- if (reader.getReferenceNum() == 0) {
+ if (!enforceBufferBeforeClose && reader.getReferenceNum() == 0) {
try {
reader.close(); // tear down properly
} catch (IOException e) {
throw new RuntimeException(e);
}
- waitForRemove.remove(removal.getKey());
} else {
- waitForRemove.put(removal.getKey(), reader);
+ // block changing the buffer while closing index readers in the buffer
+ synchronized (indexReaderCache) {
+ readerCloseBuffer.put(removal.getKey(), reader);
+ }
}
}
};
- public static FileChunk getFileChunks(String queryId,
+ public static FileChunk getFileChunks(TajoConf conf,
+ String queryId,
String ebSeqId,
Path outDir,
String startKey,
String endKey,
boolean last) throws IOException, ExecutionException {
- BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
+ BSTIndexReader idxReader;
+
+ if (indexReaderCache != null) {
+ CacheKey key = new CacheKey(outDir, queryId, ebSeqId);
+ synchronized (indexReaderCache) {
+ idxReader = readerCloseBuffer.remove(key);
+
+ if (idxReader != null) {
+ // Adding an element to the cache also can incur a removal of an element from the cache.
+ // So, the below line also should be synchronized.
+ indexReaderCache.put(key, idxReader);
+ } else {
+ idxReader = indexReaderCache.get(key);
+ }
+ }
+ } else {
+ idxReader = new BSTIndex(conf).getIndexReader(new Path(outDir, "index"));
+ }
idxReader.retain();
File data;