You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/03/12 11:18:00 UTC
tajo git commit: TAJO-1394 Support reconnect on tsql.
Repository: tajo
Updated Branches:
refs/heads/master 607fdea45 -> 4a9da73c6
TAJO-1394 Support reconnect on tsql.
closes #414
Signed-off-by: Hyunsik Choi <hy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4a9da73c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4a9da73c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4a9da73c
Branch: refs/heads/master
Commit: 4a9da73c6cc6ff670f867c13d60dd951bf8190bc
Parents: 607fdea
Author: navis.ryu <na...@apache.org>
Authored: Thu Mar 12 10:57:53 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Mar 12 02:46:09 2015 -0700
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/cli/tsql/TajoCli.java | 61 +++++++++++---------
.../apache/tajo/client/SessionConnection.java | 60 +++++++++++++++++++
3 files changed, 97 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f330a0a..39f0fc4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1394: Support reconnect on tsql.
+ (Contributed by navis, Committed by hyunsik)
+
TAJO-527: Upgrade to Netty 4. (jihun)
TAJO-1369: Some stack trace information is missed in error/fail logging.
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index 7d7d0bd..354f60d 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -41,10 +41,7 @@ import java.io.*;
import java.lang.reflect.Constructor;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
public class TajoCli {
public static final String ERROR_PREFIX = "ERROR: ";
@@ -60,6 +57,8 @@ public class TajoCli {
private final PrintWriter sout;
private TajoFileHistory history;
+ private final boolean reconnect; // reconnect on invalid session
+
// Current States
private String currentDatabase;
@@ -99,6 +98,7 @@ public class TajoCli {
options.addOption("B", "background", false, "execute as background process");
options.addOption("conf", "conf", true, "configuration value");
options.addOption("param", "param", true, "parameter value in SQL file");
+ options.addOption("reconnect", "reconnect", false, "reconnect on invalid session");
options.addOption("help", "help", false, "help");
}
@@ -208,6 +208,8 @@ public class TajoCli {
processConfVarCommand(cmd.getOptionValues("conf"));
}
+ this.reconnect = cmd.hasOption("reconnect");
+
// if there is no "-h" option,
if(hostName == null) {
if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
@@ -467,13 +469,8 @@ public class TajoCli {
try {
invoked.invoke(arguments);
- } catch (IllegalArgumentException ige) {
- displayFormatter.printErrorMessage(sout, ige);
- wasError = true;
- return -1;
} catch (Exception e) {
- displayFormatter.printErrorMessage(sout, e);
- wasError = true;
+ onError(null, e);
return -1;
} finally {
context.getOutput().flush();
@@ -492,8 +489,7 @@ public class TajoCli {
long startTime = System.currentTimeMillis();
ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json);
if (response == null) {
- displayFormatter.printErrorMessage(sout, "response is null");
- wasError = true;
+ onError("response is null", null);
} else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
if (response.getIsForwarded()) {
QueryId queryId = new QueryId(response.getQueryId());
@@ -508,8 +504,7 @@ public class TajoCli {
}
} else {
if (response.hasErrorMessage()) {
- displayFormatter.printErrorMessage(sout, response.getErrorMessage());
- wasError = true;
+ onError(response.getErrorMessage(), null);
}
}
}
@@ -520,17 +515,12 @@ public class TajoCli {
ClientProtos.SubmitQueryResponse response = null;
try{
response = client.executeQuery(statement);
- } catch (ServiceException e){
- displayFormatter.printErrorMessage(sout, e.getMessage());
- wasError = true;
} catch(Throwable te){
- displayFormatter.printErrorMessage(sout, te);
- wasError = true;
+ onError(null, te);
}
if (response == null) {
- displayFormatter.printErrorMessage(sout, "response is null");
- wasError = true;
+ onError("response is null", null);
} else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
if (response.getIsForwarded()) {
QueryId queryId = new QueryId(response.getQueryId());
@@ -544,8 +534,7 @@ public class TajoCli {
}
} else {
if (response.hasErrorMessage()) {
- displayFormatter.printErrorMessage(sout, response.getErrorMessage());
- wasError = true;
+ onError(response.getErrorMessage(), null);
}
}
@@ -569,13 +558,13 @@ public class TajoCli {
displayFormatter.printResult(sout, sin, desc, responseTime, res);
}
} catch (Throwable t) {
- displayFormatter.printErrorMessage(sout, t);
- wasError = true;
+ onError(null, t);
} finally {
if (res != null) {
try {
res.close();
} catch (SQLException e) {
+ // ignore
}
}
}
@@ -637,8 +626,7 @@ public class TajoCli {
}
}
} catch (Throwable t) {
- displayFormatter.printErrorMessage(sout, t);
- wasError = true;
+ onError(null, t);
} finally {
if (res != null) {
try {
@@ -668,6 +656,25 @@ public class TajoCli {
sout.println("Invalid command " + command + ". Try \\? for help.");
}
+ private void onError(String message, Throwable t) {
+ wasError = true;
+ if (t == null) {
+ displayFormatter.printErrorMessage(sout, message);
+ } else {
+ displayFormatter.printErrorMessage(sout, t);
+ }
+ if (reconnect && (t instanceof InvalidClientSessionException ||
+ (message != null && message.startsWith("org.apache.tajo.session.InvalidSessionException")))) {
+ if (client instanceof SessionConnection) {
+ try {
+ ((SessionConnection)client).reconnect();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+
@VisibleForTesting
public void close() {
//for testcase
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index bcf6d8b..d05d3b1 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,6 +21,7 @@ package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.auth.UserRoleInfo;
@@ -319,6 +320,65 @@ public class SessionConnection implements Closeable {
}
}
+ public boolean reconnect() throws Exception {
+ return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+ public Boolean call(NettyClientBase client) throws ServiceException {
+ CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+ builder.setUsername(userInfo.getUserName()).build();
+ if (baseDatabase != null) {
+ builder.setBaseDatabaseName(baseDatabase);
+ }
+
+
+ // create new session
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
+ if (response.getResultCode() != ResultCode.OK) {
+ return false;
+ }
+
+ // Invalidate some session variables in client cache
+ sessionId = response.getSessionId();
+ Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+ synchronized (sessionVarsCache) {
+ for (SessionVars var : UPDATE_ON_RECONNECT) {
+ String value = sessionVars.get(var.keyname());
+ if (value != null) {
+ sessionVarsCache.put(var.keyname(), value);
+ }
+ }
+ }
+
+ // Update the session variables in server side
+ try {
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.putAll(sessionVarsCache);
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .setSessionVars(keyValueSet.getProto()).build();
+
+ if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
+ }
+ LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+ return true;
+ } catch (ServiceException e) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
+ }
+ }
+ }.withRetries();
+ }
+
+ /**
+ * Session variables which should be updated upon reconnecting
+ */
+ private static final SessionVars[] UPDATE_ON_RECONNECT = new SessionVars[] {
+ SessionVars.SESSION_ID, SessionVars.SESSION_LAST_ACCESS_TIME, SessionVars.CLIENT_HOST
+ };
+
ClientProtos.SessionedStringProto convertSessionedString(String str) {
ClientProtos.SessionedStringProto.Builder builder = ClientProtos.SessionedStringProto.newBuilder();
builder.setSessionId(sessionId);