You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/03/12 11:18:00 UTC

tajo git commit: TAJO-1394 Support reconnect on tsql.

Repository: tajo
Updated Branches:
  refs/heads/master 607fdea45 -> 4a9da73c6


TAJO-1394 Support reconnect on tsql.

closes #414

Signed-off-by: Hyunsik Choi <hy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4a9da73c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4a9da73c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4a9da73c

Branch: refs/heads/master
Commit: 4a9da73c6cc6ff670f867c13d60dd951bf8190bc
Parents: 607fdea
Author: navis.ryu <na...@apache.org>
Authored: Thu Mar 12 10:57:53 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Mar 12 02:46:09 2015 -0700

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/cli/tsql/TajoCli.java  | 61 +++++++++++---------
 .../apache/tajo/client/SessionConnection.java   | 60 +++++++++++++++++++
 3 files changed, 97 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f330a0a..39f0fc4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1394: Support reconnect on tsql. 
+    (Contributed by navis, Committed by hyunsik)
+
     TAJO-527: Upgrade to Netty 4. (jihun)
 
     TAJO-1369: Some stack trace information is missed in error/fail logging. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index 7d7d0bd..354f60d 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -41,10 +41,7 @@ import java.io.*;
 import java.lang.reflect.Constructor;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
 
 public class TajoCli {
   public static final String ERROR_PREFIX = "ERROR: ";
@@ -60,6 +57,8 @@ public class TajoCli {
   private final PrintWriter sout;
   private TajoFileHistory history;
 
+  private final boolean reconnect;  // reconnect on invalid session
+
   // Current States
   private String currentDatabase;
 
@@ -99,6 +98,7 @@ public class TajoCli {
     options.addOption("B", "background", false, "execute as background process");
     options.addOption("conf", "conf", true, "configuration value");
     options.addOption("param", "param", true, "parameter value in SQL file");
+    options.addOption("reconnect", "reconnect", false, "reconnect on invalid session");
     options.addOption("help", "help", false, "help");
   }
 
@@ -208,6 +208,8 @@ public class TajoCli {
       processConfVarCommand(cmd.getOptionValues("conf"));
     }
 
+    this.reconnect = cmd.hasOption("reconnect");
+
     // if there is no "-h" option,
     if(hostName == null) {
       if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
@@ -467,13 +469,8 @@ public class TajoCli {
 
       try {
         invoked.invoke(arguments);
-      } catch (IllegalArgumentException ige) {
-        displayFormatter.printErrorMessage(sout, ige);
-        wasError = true;
-        return -1;
       } catch (Exception e) {
-        displayFormatter.printErrorMessage(sout, e);
-        wasError = true;
+        onError(null, e);
         return -1;
       } finally {
         context.getOutput().flush();
@@ -492,8 +489,7 @@ public class TajoCli {
     long startTime = System.currentTimeMillis();
     ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json);
     if (response == null) {
-      displayFormatter.printErrorMessage(sout, "response is null");
-      wasError = true;
+      onError("response is null", null);
     } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
       if (response.getIsForwarded()) {
         QueryId queryId = new QueryId(response.getQueryId());
@@ -508,8 +504,7 @@ public class TajoCli {
       }
     } else {
       if (response.hasErrorMessage()) {
-        displayFormatter.printErrorMessage(sout, response.getErrorMessage());
-        wasError = true;
+        onError(response.getErrorMessage(), null);
       }
     }
   }
@@ -520,17 +515,12 @@ public class TajoCli {
     ClientProtos.SubmitQueryResponse response = null;
     try{
       response = client.executeQuery(statement);
-    } catch (ServiceException e){
-      displayFormatter.printErrorMessage(sout, e.getMessage());
-      wasError = true;
     } catch(Throwable te){
-      displayFormatter.printErrorMessage(sout, te);
-      wasError = true;
+      onError(null, te);
     }
 
     if (response == null) {
-      displayFormatter.printErrorMessage(sout, "response is null");
-      wasError = true;
+      onError("response is null", null);
     } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
       if (response.getIsForwarded()) {
         QueryId queryId = new QueryId(response.getQueryId());
@@ -544,8 +534,7 @@ public class TajoCli {
       }
     } else {
       if (response.hasErrorMessage()) {
-        displayFormatter.printErrorMessage(sout, response.getErrorMessage());
-        wasError = true;
+        onError(response.getErrorMessage(), null);
       }
     }
 
@@ -569,13 +558,13 @@ public class TajoCli {
         displayFormatter.printResult(sout, sin, desc, responseTime, res);
       }
     } catch (Throwable t) {
-      displayFormatter.printErrorMessage(sout, t);
-      wasError = true;
+      onError(null, t);
     } finally {
       if (res != null) {
         try {
           res.close();
         } catch (SQLException e) {
+          // ignore
         }
       }
     }
@@ -637,8 +626,7 @@ public class TajoCli {
         }
       }
     } catch (Throwable t) {
-      displayFormatter.printErrorMessage(sout, t);
-      wasError = true;
+      onError(null, t);
     } finally {
       if (res != null) {
         try {
@@ -668,6 +656,25 @@ public class TajoCli {
     sout.println("Invalid command " + command + ". Try \\? for help.");
   }
 
+  private void onError(String message, Throwable t) {
+    wasError = true;
+    if (t == null) {
+      displayFormatter.printErrorMessage(sout, message);
+    } else {
+      displayFormatter.printErrorMessage(sout, t);
+    }
+    if (reconnect && (t instanceof InvalidClientSessionException ||
+        (message != null && message.startsWith("org.apache.tajo.session.InvalidSessionException")))) {
+      if (client instanceof SessionConnection) {
+        try {
+          ((SessionConnection)client).reconnect();
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   public void close() {
     //for testcase

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index bcf6d8b..d05d3b1 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,6 +21,7 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.auth.UserRoleInfo;
@@ -319,6 +320,65 @@ public class SessionConnection implements Closeable {
     }
   }
 
+  public boolean reconnect() throws Exception {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+        builder.setUsername(userInfo.getUserName()).build();
+        if (baseDatabase != null) {
+          builder.setBaseDatabaseName(baseDatabase);
+        }
+
+
+        // create new session
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
+        if (response.getResultCode() != ResultCode.OK) {
+          return false;
+        }
+
+        // Invalidate some session variables in client cache
+        sessionId = response.getSessionId();
+        Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+        synchronized (sessionVarsCache) {
+          for (SessionVars var : UPDATE_ON_RECONNECT) {
+            String value = sessionVars.get(var.keyname());
+            if (value != null) {
+              sessionVarsCache.put(var.keyname(), value);
+            }
+          }
+        }
+
+        // Update the session variables in server side
+        try {
+          KeyValueSet keyValueSet = new KeyValueSet();
+          keyValueSet.putAll(sessionVarsCache);
+          ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+              .setSessionId(sessionId)
+              .setSessionVars(keyValueSet.getProto()).build();
+
+          if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
+            tajoMasterService.removeSession(null, sessionId);
+            return false;
+          }
+          LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+          return true;
+        } catch (ServiceException e) {
+          tajoMasterService.removeSession(null, sessionId);
+          return false;
+        }
+      }
+    }.withRetries();
+  }
+
+  /**
+   * Session variables which should be updated upon reconnecting
+   */
+  private static final SessionVars[] UPDATE_ON_RECONNECT = new SessionVars[] {
+    SessionVars.SESSION_ID, SessionVars.SESSION_LAST_ACCESS_TIME, SessionVars.CLIENT_HOST
+  };
+
   ClientProtos.SessionedStringProto convertSessionedString(String str) {
     ClientProtos.SessionedStringProto.Builder builder = ClientProtos.SessionedStringProto.newBuilder();
     builder.setSessionId(sessionId);