You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by si...@apache.org on 2015/03/17 05:51:03 UTC
tajo git commit: TAJO-1405: Fix some illegal way of usages on
connection pool. (Contributed by navis, Committed by Keuntae Park)
Repository: tajo
Updated Branches:
refs/heads/master 0dc7d6807 -> 286b95679
TAJO-1405: Fix some illegal way of usages on connection pool. (Contributed by navis, Committed by Keuntae Park)
Closes #425
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/286b9567
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/286b9567
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/286b9567
Branch: refs/heads/master
Commit: 286b956795a4dc2efb72c97896d86ed1049485e3
Parents: 0dc7d68
Author: Keuntae Park <si...@apache.org>
Authored: Tue Mar 17 13:47:20 2015 +0900
Committer: Keuntae Park <si...@apache.org>
Committed: Tue Mar 17 13:47:20 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../apache/tajo/client/SessionConnection.java | 18 +++--
.../tajo/worker/ExecutionBlockContext.java | 43 +++++++----
.../main/java/org/apache/tajo/worker/Task.java | 75 +++++++++++---------
.../java/org/apache/tajo/worker/TaskRunner.java | 9 ++-
.../org/apache/tajo/rpc/AsyncRpcClient.java | 9 ++-
.../org/apache/tajo/rpc/NettyClientBase.java | 6 ++
7 files changed, 103 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9d2cd14..0d7222f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,9 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1405: Fix some illegal way of usages on connection pool.
+ (Contributed by navis, Committed by Keuntae Park)
+
TAJO-1384: Duplicated output file path problem. (jihoon)
TAJO-1386: CURRENT_DATE generates parsing errors sometimes.
http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index d24e7b3..c084d95 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -57,7 +57,7 @@ public class SessionConnection implements Closeable {
final RpcConnectionPool connPool;
- private final String baseDatabase;
+ private String baseDatabase;
private final UserRoleInfo userInfo;
@@ -260,7 +260,8 @@ public class SessionConnection implements Closeable {
}
public Boolean selectDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+ Boolean selected = new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
+ TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
checkSessionAndGet(client);
@@ -269,6 +270,11 @@ public class SessionConnection implements Closeable {
return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
}
}.withRetries();
+
+ if (selected == Boolean.TRUE) {
+ this.baseDatabase = databaseName;
+ }
+ return selected;
}
@Override
@@ -278,13 +284,15 @@ public class SessionConnection implements Closeable {
}
// remove session
+ NettyClientBase client = null;
try {
-
- NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+ client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
tajoMaster.removeSession(null, sessionId);
-
} catch (Throwable e) {
+ // ignore
+ } finally {
+ connPool.releaseConnection(client);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index a645689..2377720 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -36,13 +36,11 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
-import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
@@ -139,7 +137,17 @@ public class ExecutionBlockContext {
try{
this.resource.initialize(queryContext, plan);
} catch (Throwable e) {
- getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+ try {
+ NettyClientBase client = getQueryMasterConnection();
+ try {
+ QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+ stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+ } finally {
+ connPool.releaseConnection(client);
+ }
+ } catch (Throwable t) {
+ //ignore
+ }
throw e;
}
}
@@ -148,15 +156,13 @@ public class ExecutionBlockContext {
return resource;
}
- public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub()
+ public NettyClientBase getQueryMasterConnection()
throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
- NettyClientBase clientBase = null;
- try {
- clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
- return clientBase.getStub();
- } finally {
- connPool.releaseConnection(clientBase);
- }
+ return connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+ }
+
+ public void releaseConnection(NettyClientBase connection) {
+ connPool.releaseConnection(connection);
}
public void stop(){
@@ -267,7 +273,13 @@ public class ExecutionBlockContext {
}
private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
- getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
+ NettyClientBase client = getQueryMasterConnection();
+ try {
+ QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+ stub.doneExecutionBlock(null, reporter, NullCallback.get());
+ } finally {
+ connPool.releaseConnection(client);
+ }
}
protected void reportExecutionBlock(ExecutionBlockId ebId) {
@@ -361,12 +373,14 @@ public class ExecutionBlockContext {
return new Runnable() {
int remainingRetries = MAX_RETRIES;
- QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub;
@Override
public void run() {
while (!reporterStop.get() && !Thread.interrupted()) {
+
+ NettyClientBase client = null;
try {
- masterStub = getQueryMasterStub();
+ client = getQueryMasterConnection();
+ QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();
if(tasks.size() == 0){
masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
@@ -391,6 +405,7 @@ public class ExecutionBlockContext {
throw new RuntimeException(t);
}
} finally {
+ releaseConnection(client);
if (remainingRetries > 0 && !reporterStop.get()) {
synchronized (reporterThread) {
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 524b09b..9ff18dd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -52,6 +52,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
@@ -424,46 +425,52 @@ public class Task {
executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
- QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
- if (context.isStopped()) {
- context.setExecutorProgress(0.0f);
- if(context.getState() == TaskAttemptState.TA_KILLED) {
- queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
- executionBlockContext.killedTasksNum.incrementAndGet();
- } else {
- context.setState(TaskAttemptState.TA_FAILED);
- TaskFatalErrorReport.Builder errorBuilder =
- TaskFatalErrorReport.newBuilder()
- .setId(getId().getProto());
- if (error != null) {
- if (error.getMessage() == null) {
- errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
- } else {
- errorBuilder.setErrorMessage(error.getMessage());
+ NettyClientBase client = executionBlockContext.getQueryMasterConnection();
+ try {
+ QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
+ if (context.isStopped()) {
+ context.setExecutorProgress(0.0f);
+
+ if (context.getState() == TaskAttemptState.TA_KILLED) {
+ queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+ executionBlockContext.killedTasksNum.incrementAndGet();
+ } else {
+ context.setState(TaskAttemptState.TA_FAILED);
+ TaskFatalErrorReport.Builder errorBuilder =
+ TaskFatalErrorReport.newBuilder()
+ .setId(getId().getProto());
+ if (error != null) {
+ if (error.getMessage() == null) {
+ errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
+ } else {
+ errorBuilder.setErrorMessage(error.getMessage());
+ }
+ errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
}
- errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
+
+ queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+ executionBlockContext.failedTasksNum.incrementAndGet();
}
+ } else {
+ // if successful
+ context.setProgress(1.0f);
+ context.setState(TaskAttemptState.TA_SUCCEEDED);
+ executionBlockContext.succeededTasksNum.incrementAndGet();
- queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
- executionBlockContext.failedTasksNum.incrementAndGet();
+ TaskCompletionReport report = getTaskCompletionReport();
+ queryMasterStub.done(null, report, NullCallback.get());
}
- } else {
- // if successful
- context.setProgress(1.0f);
- context.setState(TaskAttemptState.TA_SUCCEEDED);
- executionBlockContext.succeededTasksNum.incrementAndGet();
-
- TaskCompletionReport report = getTaskCompletionReport();
- queryMasterStub.done(null, report, NullCallback.get());
+ finishTime = System.currentTimeMillis();
+ LOG.info(context.getTaskId() + " completed. " +
+ "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+ ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+ + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+ + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+ cleanupTask();
+ } finally {
+ executionBlockContext.releaseConnection(client);
}
- finishTime = System.currentTimeMillis();
- LOG.info(context.getTaskId() + " completed. " +
- "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
- ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
- + ", killed: " + executionBlockContext.killedTasksNum.intValue()
- + ", failed: " + executionBlockContext.failedTasksNum.intValue());
- cleanupTask();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 79725f6..058ea42 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -35,6 +35,7 @@ import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.container.TajoContainerIdPBImpl;
import org.apache.tajo.master.container.TajoConverterUtils;
import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import io.netty.channel.ConnectTimeoutException;
@@ -196,9 +197,9 @@ public class TaskRunner extends AbstractService {
TaskRequestProto taskRequest = null;
while(!stopped) {
- QueryMasterProtocolService.Interface qmClientService;
+ NettyClientBase client;
try {
- qmClientService = getContext().getQueryMasterStub();
+ client = executionBlockContext.getQueryMasterConnection();
} catch (ConnectTimeoutException ce) {
// NettyClientBase throws ConnectTimeoutException if connection was failed
stop();
@@ -212,6 +213,8 @@ public class TaskRunner extends AbstractService {
break;
}
+ QueryMasterProtocolService.Interface qmClientService = client.getStub();
+
try {
if (callFuture == null) {
callFuture = new CallFuture<TaskRequestProto>();
@@ -296,6 +299,8 @@ public class TaskRunner extends AbstractService {
}
} catch (Throwable t) {
LOG.fatal(t.getMessage(), t);
+ } finally {
+ executionBlockContext.releaseConnection(client);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 1ea9fb1..3d856ce 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -35,11 +35,12 @@ import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class AsyncRpcClient extends NettyClientBase {
private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
- private final Map<Integer, ResponseCallback> requests =
+ private final ConcurrentMap<Integer, ResponseCallback> requests =
new ConcurrentHashMap<Integer, ResponseCallback>();
private final Method stubMethod;
@@ -178,14 +179,12 @@ public class AsyncRpcClient extends NettyClientBase {
@ChannelHandler.Sharable
private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
- synchronized void registerCallback(int seqId, ResponseCallback callback) {
+ void registerCallback(int seqId, ResponseCallback callback) {
- if (requests.containsKey(seqId)) {
+ if (requests.putIfAbsent(seqId, callback) != null) {
throw new RemoteException(
getErrorMessage("Duplicate Sequence Id "+ seqId));
}
-
- requests.put(seqId, callback);
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 7dfc5a2..72278f2 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -132,6 +132,12 @@ public abstract class NettyClientBase implements Closeable {
final CountDownLatch ticket = new CountDownLatch(1);
final CountDownLatch granted = connect.check(ticket);
+ // basically, it's double checked lock
+ if (ticket == granted && isConnected()) {
+ granted.countDown();
+ return true;
+ }
+
if (ticket == granted) {
connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
}