You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/05/20 20:46:05 UTC
[07/48] git commit: TAJO-734: Arrange TajoCli output message.
(hyoungjunkim via jihoon)
TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bc1a3235
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bc1a3235
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bc1a3235
Branch: refs/heads/window_function
Commit: bc1a3235176fd5d09b0093bc5b1ff03cdca63dab
Parents: fe81035
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Apr 23 14:15:43 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Apr 23 14:15:43 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +-
.../tajo/cli/DefaultTajoCliOutputFormatter.java | 183 +++++++++++++++++++
.../main/java/org/apache/tajo/cli/TajoCli.java | 173 +++++++-----------
.../apache/tajo/cli/TajoCliOutputFormatter.java | 98 ++++++++++
.../org/apache/tajo/client/QueryStatus.java | 9 +
.../java/org/apache/tajo/client/TajoClient.java | 4 +-
tajo-client/src/main/proto/ClientProtos.proto | 6 +-
.../java/org/apache/tajo/conf/TajoConf.java | 6 +-
.../org/apache/tajo/master/GlobalEngine.java | 5 +-
.../tajo/master/TajoMasterClientService.java | 1 +
.../apache/tajo/master/querymaster/Query.java | 5 +-
.../tajo/master/querymaster/QueryInfo.java | 2 +-
.../querymaster/QueryMasterManagerService.java | 6 +-
.../master/querymaster/QueryMasterTask.java | 23 ++-
.../tajo/master/querymaster/QueryUnit.java | 11 +-
.../master/querymaster/QueryUnitAttempt.java | 10 +-
.../tajo/master/querymaster/SubQuery.java | 2 +-
.../tajo/worker/TajoWorkerClientService.java | 11 +-
.../main/java/org/apache/tajo/worker/Task.java | 15 +-
.../src/main/proto/TajoWorkerProtocol.proto | 3 +-
.../tajo/cli/TestDefaultCliOutputFormatter.java | 139 ++++++++++++++
.../org/apache/tajo/rpc/BlockingRpcClient.java | 32 ++--
.../org/apache/tajo/rpc/BlockingRpcServer.java | 2 -
.../org/apache/tajo/rpc/NettyClientBase.java | 16 +-
.../apache/tajo/rpc/RemoteCallException.java | 9 +-
.../org/apache/tajo/rpc/RpcChannelFactory.java | 4 +-
.../apache/tajo/rpc/TajoServiceException.java | 58 ++++++
tajo-rpc/src/main/proto/RpcProtos.proto | 4 +-
.../org/apache/tajo/rpc/TestBlockingRpc.java | 41 ++++-
29 files changed, 714 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ce3e4b3..d7a17d3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,7 +5,7 @@ Release 0.9.0 - unreleased
IMPROVEMENT
TAJO-769: A minor improvements for HCatalogStore (Fengdong Yu via hyunsik)
-
+ TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon)
SUB TASKS
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
new file mode 100644
index 0000000..d291414
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+
+public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter {
+ private TajoConf tajoConf;
+ private int printPauseRecords;
+ private boolean printPause;
+ private boolean printErrorTrace;
+
+ @Override
+ public void init(TajoConf tajoConf) {
+ this.tajoConf = tajoConf;
+
+ this.printPause = tajoConf.getBoolVar(TajoConf.ConfVars.CLI_PRINT_PAUSE);
+ this.printPauseRecords = tajoConf.getIntVar(TajoConf.ConfVars.CLI_PRINT_PAUSE_NUM_RECORDS);
+ this.printErrorTrace = tajoConf.getBoolVar(TajoConf.ConfVars.CLI_PRINT_ERROR_TRACE);
+ }
+
+ @Override
+ public void setScirptMode() {
+ this.printPause = false;
+ }
+
+ private String getQuerySuccessMessage(TableDesc tableDesc, float responseTime, int totalPrintedRows, String postfix) {
+ TableStats stat = tableDesc.getStats();
+ String volume = FileUtil.humanReadableByteCount(stat.getNumBytes(), false);
+ long resultRows = stat.getNumRows();
+
+ long realNumRows = resultRows != 0 ? resultRows : totalPrintedRows;
+ return "(" + realNumRows + " rows, " + responseTime + " sec, " + volume + " " + postfix + ")";
+ }
+
+ @Override
+ public void printResult(PrintWriter sout, InputStream sin, TableDesc tableDesc,
+ float responseTime, ResultSet res) throws Exception {
+ long resultRows = tableDesc.getStats().getNumRows();
+ if (resultRows == 0) {
+ resultRows = Integer.MAX_VALUE;
+ }
+
+ if (res == null) {
+ sout.println(getQuerySuccessMessage(tableDesc, responseTime, 0, "inserted"));
+ return;
+ }
+ ResultSetMetaData rsmd = res.getMetaData();
+ int numOfColumns = rsmd.getColumnCount();
+ for (int i = 1; i <= numOfColumns; i++) {
+ if (i > 1) sout.print(", ");
+ String columnName = rsmd.getColumnName(i);
+ sout.print(columnName);
+ }
+ sout.println("\n-------------------------------");
+
+ int numOfPrintedRows = 0;
+ int totalPrintedRows = 0;
+ while (res.next()) {
+ for (int i = 1; i <= numOfColumns; i++) {
+ if (i > 1) sout.print(", ");
+ String columnValue = res.getObject(i).toString();
+ if(res.wasNull()){
+ sout.print("null");
+ } else {
+ sout.print(columnValue);
+ }
+ }
+ sout.println();
+ sout.flush();
+ numOfPrintedRows++;
+ totalPrintedRows++;
+ if (printPause && printPauseRecords > 0 && totalPrintedRows < resultRows && numOfPrintedRows >= printPauseRecords) {
+ if (resultRows < Integer.MAX_VALUE) {
+ sout.print("(" + totalPrintedRows + "/" + resultRows + " rows, continue... 'q' is quit)");
+ } else {
+ sout.print("(" + totalPrintedRows + " rows, continue... 'q' is quit)");
+ }
+ sout.flush();
+ if (sin != null) {
+ if (sin.read() == 'q') {
+ sout.println();
+ break;
+ }
+ }
+ numOfPrintedRows = 0;
+ sout.println();
+ }
+ }
+ sout.println(getQuerySuccessMessage(tableDesc, responseTime, totalPrintedRows, "selected"));
+ }
+
+ @Override
+ public void printNoResult(PrintWriter sout) {
+ sout.println("(0 rows)");
+ }
+
+ @Override
+ public void printProgress(PrintWriter sout, QueryStatus status) {
+ sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
+ + "%, response time: " + ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0) + " sec");
+ sout.flush();
+ }
+
+ @Override
+ public void printMessage(PrintWriter sout, String message) {
+ sout.println(message);
+ }
+
+ @Override
+ public void printErrorMessage(PrintWriter sout, Throwable t) {
+ sout.println(parseErrorMessage(t.getMessage()));
+ if (printErrorTrace) {
+ sout.println(ExceptionUtils.getStackTrace(t));
+ }
+ }
+
+ @Override
+ public void printErrorMessage(PrintWriter sout, String message) {
+ sout.println(parseErrorMessage(message));
+ }
+
+ @Override
+ public void printKilledMessage(PrintWriter sout, QueryId queryId) {
+ sout.println(TajoCli.KILL_PREFIX + queryId);
+ }
+
+ @Override
+ public void printErrorMessage(PrintWriter sout, QueryStatus status) {
+ if (status.getErrorMessage() != null && !status.getErrorMessage().isEmpty()) {
+ printErrorMessage(sout, parseErrorMessage(status.getErrorMessage()));
+ } else {
+ printErrorMessage(sout, "No error message");
+ }
+ if (printErrorTrace && status.getErrorTrace() != null && !status.getErrorTrace().isEmpty()) {
+ sout.println(status.getErrorTrace());
+ }
+ }
+
+ public static String parseErrorMessage(String message) {
+ if (message == null) {
+ return TajoCli.ERROR_PREFIX + "No error message";
+ }
+ String[] lines = message.split("\n");
+ message = lines[0];
+
+ int index = message.lastIndexOf(TajoCli.ERROR_PREFIX);
+ if (index < 0) {
+ message = TajoCli.ERROR_PREFIX + message;
+ } else {
+ message = message.substring(index);
+ }
+
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 508b8bb..606ca88 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -35,7 +35,6 @@ import org.apache.tajo.util.FileUtil;
import java.io.*;
import java.lang.reflect.Constructor;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
@@ -47,6 +46,8 @@ import static org.apache.tajo.cli.ParsedResult.StatementType.STATEMENT;
import static org.apache.tajo.cli.SimpleParser.ParsingState;
public class TajoCli {
+ public static final String ERROR_PREFIX = "ERROR: ";
+ public static final String KILL_PREFIX = "KILL: ";
private final TajoConf conf;
private TajoClient client;
@@ -61,6 +62,8 @@ public class TajoCli {
// Current States
private String currentDatabase;
+ private TajoCliOutputFormatter outputFormatter;
+
private static final Class [] registeredCommands = {
DescTableCommand.class,
DescFunctionCommand.class,
@@ -78,7 +81,6 @@ public class TajoCli {
};
private final Map<String, TajoShellCommand> commands = new TreeMap<String, TajoShellCommand>();
- public static final int PRINT_LIMIT = 24;
private static final Options options;
private static final String HOME_DIR = System.getProperty("user.home");
private static final String HISTORY_FILE = ".tajo_history";
@@ -121,6 +123,11 @@ public class TajoCli {
this.reader = new ConsoleReader(sin, out);
this.reader.setExpandEvents(false);
this.sout = new PrintWriter(reader.getOutput());
+ Class formatterClass = conf.getClass(conf.getVar(ConfVars.CLI__OUTPUT_FORMATTER_CLASS),
+ DefaultTajoCliOutputFormatter.class);
+
+ this.outputFormatter = (TajoCliOutputFormatter)formatterClass.newInstance();
+ this.outputFormatter.init(conf);
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
@@ -160,7 +167,7 @@ public class TajoCli {
}
if ((hostName == null) ^ (port == null)) {
- System.err.println("ERROR: cannot find valid Tajo server address");
+ System.err.println(ERROR_PREFIX + "cannot find valid Tajo server address");
throw new RuntimeException("cannot find valid Tajo server address");
} else if (hostName != null && port != null) {
conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
@@ -175,11 +182,13 @@ public class TajoCli {
initCommands();
if (cmd.hasOption("c")) {
+ outputFormatter.setScirptMode();
executeScript(cmd.getOptionValue("c"));
sout.flush();
System.exit(0);
}
if (cmd.hasOption("f")) {
+ outputFormatter.setScirptMode();
File sqlFile = new File(cmd.getOptionValue("f"));
if (sqlFile.exists()) {
String script = FileUtil.readTextFile(new File(cmd.getOptionValue("f")));
@@ -187,7 +196,7 @@ public class TajoCli {
sout.flush();
System.exit(0);
} else {
- System.err.println("No such a file \"" + cmd.getOptionValue("f") + "\"");
+ System.err.println(ERROR_PREFIX + "No such a file \"" + cmd.getOptionValue("f") + "\"");
System.exit(-1);
}
}
@@ -202,10 +211,10 @@ public class TajoCli {
history = new TajoFileHistory(new File(historyPath));
reader.setHistory(history);
} else {
- System.err.println("ERROR: home directory : '" + HOME_DIR +"' does not exist.");
+ System.err.println(ERROR_PREFIX + "home directory : '" + HOME_DIR +"' does not exist.");
}
} catch (Exception e) {
- System.err.println(e.getMessage());
+ System.err.println(ERROR_PREFIX + e.getMessage());
}
}
@@ -296,9 +305,9 @@ public class TajoCli {
try {
invoked.invoke(arguments);
} catch (IllegalArgumentException ige) {
- sout.println(ige.getMessage());
+ outputFormatter.printErrorMessage(sout, ige);
} catch (Exception e) {
- sout.println(e.getMessage());
+ outputFormatter.printErrorMessage(sout, e);
}
}
@@ -306,44 +315,43 @@ public class TajoCli {
}
private void executeQuery(String statement) throws ServiceException {
+ long startTime = System.currentTimeMillis();
ClientProtos.SubmitQueryResponse response = client.executeQuery(statement);
if (response == null) {
- sout.println("response is null");
+ outputFormatter.printErrorMessage(sout, "response is null");
} else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
if (response.getIsForwarded()) {
QueryId queryId = new QueryId(response.getQueryId());
- try {
- waitForQueryCompleted(queryId);
- } finally {
- client.closeQuery(queryId);
- }
+ waitForQueryCompleted(queryId);
} else {
if (!response.hasTableDesc() && !response.hasResultSet()) {
- sout.println("Ok");
+ outputFormatter.printMessage(sout, "OK");
} else {
-
- ResultSet resultSet;
- int numBytes;
- long maxRowNum;
- try {
- resultSet = TajoClient.createResultSet(client, response);
- if (response.hasTableDesc()) {
- numBytes = 0;
- } else {
- numBytes = response.getResultSet().getBytesNum();
- }
- maxRowNum = response.getMaxRowNum();
- printResult(resultSet, maxRowNum, numBytes);
- } catch (IOException ioe) {
- sout.println(ioe.getMessage());
- } catch (SQLException sqe) {
- sout.println(sqe.getMessage());
- }
+ localQueryCompleted(response, startTime);
}
}
} else {
if (response.hasErrorMessage()) {
- sout.println(response.getErrorMessage());
+ outputFormatter.printErrorMessage(sout, response.getErrorMessage());
+ }
+ }
+ }
+
+ private void localQueryCompleted(ClientProtos.SubmitQueryResponse response, long startTime) {
+ ResultSet res = null;
+ try {
+ res = TajoClient.createResultSet(client, response);
+ float responseTime = ((float)(System.currentTimeMillis() - startTime) / 1000.0f);
+ TableDesc desc = new TableDesc(response.getTableDesc());
+ outputFormatter.printResult(sout, sin, desc, responseTime, res);
+ } catch (Throwable t) {
+ outputFormatter.printErrorMessage(sout, t);
+ } finally {
+ if (res != null) {
+ try {
+ res.close();
+ } catch (SQLException e) {
+ }
}
}
}
@@ -355,9 +363,10 @@ public class TajoCli {
}
// query execute
+ ResultSet res = null;
+ QueryStatus status = null;
try {
- QueryStatus status;
int initRetries = 0;
int progressRetries = 0;
while (true) {
@@ -370,9 +379,7 @@ public class TajoCli {
}
if (status.getState() == QueryState.QUERY_RUNNING || status.getState() == QueryState.QUERY_SUCCEEDED) {
- sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
- + "%, response time: " + ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0) + " sec");
- sout.flush();
+ outputFormatter.printProgress(sout, status);
}
if (status.getState() != QueryState.QUERY_RUNNING &&
@@ -385,91 +392,37 @@ public class TajoCli {
}
}
- if (status.getState() == QueryState.QUERY_ERROR) {
- sout.println("Internal error!");
- if(status.getErrorMessage() != null && !status.getErrorMessage().isEmpty()) {
- sout.println(status.getErrorMessage());
- }
- } else if (status.getState() == QueryState.QUERY_FAILED) {
- sout.println("Query failed!");
+ if (status.getState() == QueryState.QUERY_ERROR || status.getState() == QueryState.QUERY_FAILED) {
+ outputFormatter.printErrorMessage(sout, status);
} else if (status.getState() == QueryState.QUERY_KILLED) {
- sout.println(queryId + " is killed.");
+ outputFormatter.printKilledMessage(sout, queryId);
} else {
if (status.getState() == QueryState.QUERY_SUCCEEDED) {
- sout.println("final state: " + status.getState()
- + ", response time: " + (((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0)
- + " sec"));
+ float responseTime = ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0f);
+ ClientProtos.GetQueryResultResponse response = client.getResultResponse(queryId);
if (status.hasResult()) {
- ClientProtos.GetQueryResultResponse response = client.getResultResponse(queryId);
- ResultSet res = TajoClient.createResultSet(client, queryId, response);
+ res = TajoClient.createResultSet(client, queryId, response);
TableDesc desc = new TableDesc(response.getTableDesc());
- long totalRowNum = desc.getStats().getNumRows();
- long totalBytes = desc.getStats().getNumBytes();
- printResult(res, totalRowNum, totalBytes);
+ outputFormatter.printResult(sout, sin, desc, responseTime, res);
} else {
- sout.println("OK");
+ TableDesc desc = new TableDesc(response.getTableDesc());
+ outputFormatter.printResult(sout, sin, desc, responseTime, res);
}
}
}
} catch (Throwable t) {
- t.printStackTrace();
- System.err.println(t.getMessage());
- }
- }
-
- private void printResult(ResultSet res, long rowNum, long numBytes) throws IOException, SQLException {
- try {
- if (res == null) {
- sout.println("OK");
- return;
- }
-
- ResultSetMetaData rsmd = res.getMetaData();
-
- String volume = FileUtil.humanReadableByteCount(numBytes, false);
- String rowNumStr = rowNum == Integer.MAX_VALUE ? "unknown" : rowNum + "";
- sout.println("result: " + rowNumStr + " rows (" + volume + ")");
-
- int numOfColumns = rsmd.getColumnCount();
- for (int i = 1; i <= numOfColumns; i++) {
- if (i > 1) sout.print(", ");
- String columnName = rsmd.getColumnName(i);
- sout.print(columnName);
- }
- sout.println("\n-------------------------------");
-
- int numOfPrintedRows = 0;
- while (res.next()) {
- // TODO - to be improved to print more formatted text
- for (int i = 1; i <= numOfColumns; i++) {
- if (i > 1) sout.print(", ");
- String columnValue = res.getObject(i).toString();
- if(res.wasNull()){
- sout.print("null");
- } else {
- sout.print(columnValue);
- }
+ outputFormatter.printErrorMessage(sout, t);
+ } finally {
+ if (res != null) {
+ try {
+ res.close();
+ } catch (SQLException e) {
}
- sout.println();
- sout.flush();
- numOfPrintedRows++;
- if (numOfPrintedRows >= PRINT_LIMIT) {
- sout.print("continue... ('q' is quit)");
- sout.flush();
- if (sin.read() == 'q') {
- sout.println();
- break;
- }
- numOfPrintedRows = 0;
- sout.println();
+ } else {
+ if (status != null && status.getQueryId() != null) {
+ client.closeQuery(status.getQueryId());
}
}
- } catch (SQLException e) {
- e.printStackTrace();
- } finally {
- if(res != null) {
- res.close();
- }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java
new file mode 100644
index 0000000..0e91669
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.sql.ResultSet;
+
+public interface TajoCliOutputFormatter {
+ /**
+ * Initialize formatter
+ * @param tajoConf
+ */
+ public void init(TajoConf tajoConf);
+
+ /**
+ * print query result to console
+ * @param sout
+ * @param sin
+ * @param tableDesc
+ * @param responseTime
+ * @param res
+ * @throws Exception
+ */
+ public void printResult(PrintWriter sout, InputStream sin, TableDesc tableDesc,
+ float responseTime, ResultSet res) throws Exception;
+
+ /**
+ * print no result message
+ * @param sout
+ */
+ public void printNoResult(PrintWriter sout);
+
+ /**
+ * print simple message
+ * @param sout
+ * @param message
+ */
+ public void printMessage(PrintWriter sout, String message);
+
+ /**
+ * print query progress message
+ * @param sout
+ * @param status
+ */
+ public void printProgress(PrintWriter sout, QueryStatus status);
+
+ /**
+ * print error message
+ * @param sout
+ * @param t
+ */
+ public void printErrorMessage(PrintWriter sout, Throwable t);
+
+ /**
+ * print error message
+ * @param sout
+ * @param message
+ */
+ public void printErrorMessage(PrintWriter sout, String message);
+
+ /**
+ * print error message
+ * @param sout
+ * @param queryId
+ */
+ public void printKilledMessage(PrintWriter sout, QueryId queryId);
+
+ /**
+ * print query status error message
+ * @param sout
+ * @param status
+ */
+ void printErrorMessage(PrintWriter sout, QueryStatus status);
+
+ void setScirptMode();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
index 203f9aa..4a38934 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
@@ -30,6 +30,7 @@ public class QueryStatus {
private long finishTime;
private boolean hasResult;
private String errorText;
+ private String errorTrace;
private String queryMasterHost;
private int queryMasterPort;
@@ -43,6 +44,9 @@ public class QueryStatus {
if (proto.hasErrorMessage()) {
errorText = proto.getErrorMessage();
}
+ if (proto.hasErrorTrace()) {
+ errorTrace = proto.getErrorTrace();
+ }
queryMasterHost = proto.getQueryMasterHost();
queryMasterPort = proto.getQueryMasterPort();
@@ -83,4 +87,9 @@ public class QueryStatus {
public String getErrorMessage() {
return errorText;
}
+
+ public String getErrorTrace() {
+ return errorTrace;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 81fc227..3c85662 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -180,7 +180,9 @@ public class TajoClient implements Closeable {
CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
if (response.getState() == CreateSessionResponse.ResultState.SUCCESS) {
sessionId = response.getSessionId();
- LOG.info(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+ }
} else {
throw new InvalidClientSessionException(response.getMessage());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 3baeee9..bdc271e 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -134,6 +134,7 @@ message SubmitQueryResponse {
optional int32 maxRowNum = 9;
optional string errorMessage = 10;
+ optional string errorTrace = 11;
}
message GetQueryStatusResponse {
@@ -145,8 +146,9 @@ message GetQueryStatusResponse {
optional int64 finishTime = 7;
optional bool hasResult = 8;
optional string errorMessage = 9;
- optional string queryMasterHost = 10;
- optional int32 queryMasterPort = 11;
+ optional string errorTrace = 10;
+ optional string queryMasterHost = 11;
+ optional int32 queryMasterPort = 12;
}
message GetClusterInfoRequest {
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 5b3d4b3..552f1e4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -245,7 +245,11 @@ public class TajoConf extends Configuration {
METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties"),
//CLI
- CLI_MAX_COLUMN("tajo.cli.max_columns", 120)
+ CLI_MAX_COLUMN("tajo.cli.max_columns", 120),
+ CLI_PRINT_PAUSE_NUM_RECORDS("tajo.cli.print.pause.num.records", 100),
+ CLI_PRINT_PAUSE("tajo.cli.print.pause", true),
+ CLI_PRINT_ERROR_TRACE("tajo.cli.print.error.trace", true),
+ CLI__OUTPUT_FORMATTER_CLASS("tajo.cli.otuptu.formatter", "org.apache.tajo.cli.DefaultTajoCliOutputFormatter");
;
public final String varname;
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8acf2b2..35b8ab8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -34,7 +34,6 @@ import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
@@ -60,7 +59,6 @@ import java.util.ArrayList;
import java.util.List;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
@@ -156,9 +154,10 @@ public class GlobalEngine extends AbstractService {
responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
String errorMessage = t.getMessage();
if (t.getMessage() == null) {
- errorMessage = StringUtils.stringifyException(t);
+ errorMessage = t.getClass().getName();
}
responseBuilder.setErrorMessage(errorMessage);
+ responseBuilder.setErrorTrace(StringUtils.stringifyException(t));
return responseBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index c968a73..c6facb1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -481,6 +481,7 @@ public class TajoMasterClientService extends AbstractService {
new QueryInfo(queryId)));
return BOOL_TRUE;
} catch (Throwable t) {
+ t.printStackTrace();
throw new ServiceException(t);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index a8f5b31..6a5248d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -164,8 +164,9 @@ public class Query implements EventHandler<QueryEvent> {
QueryEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able transitions
- .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
- EnumSet.of(QueryEventType.KILL))
+ .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
+ QueryEventType.KILL,
+ QUERY_COMPLETED_TRANSITION)
// Transitions from FAILED state
.addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index b077b36..9e455ae 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -119,7 +119,7 @@ public class QueryInfo {
@Override
public String toString() {
- return queryId.toString() + "state=" + queryState +",progress=" + progress + ", queryMaster="
+ return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster="
+ getQueryMasterHost();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index bf59e9f..43c85d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -186,7 +186,11 @@ public class QueryMasterManagerService extends CompositeService
try {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
- queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
+ if (queryMasterTask != null) {
+ queryMasterTask.handleTaskFailed(report);
+ } else {
+ LOG.warn("No QueryMasterTask: " + new QueryUnitAttemptId(report.getId()));
+ }
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 39ea430..4a14359 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -48,6 +48,7 @@ import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.GlobalEngine;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoContainerProxy;
@@ -64,8 +65,7 @@ import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -115,6 +115,9 @@ public class QueryMasterTask extends CompositeService {
private Throwable initError;
+ private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
+ new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
+
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
QueryId queryId, Session session, QueryContext queryContext, String sql,
String logicalPlanJson) {
@@ -218,6 +221,22 @@ public class QueryMasterTask extends CompositeService {
query.getSubQuery(id).handleTaskRequestEvent(event);
}
+ public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
+ synchronized(diagnostics) {
+ if (diagnostics.size() < 10) {
+ diagnostics.add(report);
+ }
+ }
+
+ getEventHandler().handle(new TaskFatalErrorEvent(report));
+ }
+
+ public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
+ synchronized(diagnostics) {
+ return Collections.unmodifiableCollection(diagnostics);
+ }
+ }
+
private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
public void handle(SubQueryEvent event) {
ExecutionBlockId id = event.getSubQueryId();
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 42fbf8a..34686da 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -549,13 +549,16 @@ public class QueryUnit implements EventHandler<TaskEvent> {
@Override
public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
- LOG.info("=============================================================");
- LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
- LOG.info("=============================================================");
task.failedAttempts++;
task.finishedAttempts++;
+ boolean retry = task.failedAttempts < task.maxAttempts;
+
+ LOG.info("====================================================================================");
+ LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " +
+ "retry:" + retry + ", attempts:" + task.failedAttempts + " <<<");
+ LOG.info("====================================================================================");
- if (task.failedAttempts < task.maxAttempts) {
+ if (retry) {
if (task.successfulAttempt == null) {
task.addAndScheduleAttempt();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 7993ce9..b69742c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -18,6 +18,7 @@
package org.apache.tajo.master.querymaster;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -97,6 +98,9 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
TaskAttemptEventType.TA_KILL,
new KillTaskTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_KILL,
+ new KillTaskTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED,
EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
@@ -155,6 +159,9 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
.addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
EnumSet.of(
TaskAttemptEventType.TA_UPDATE))
+ .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_LOCAL_KILLED,
+ new TaskKilledCompleteTransition())
.installTopology();
@@ -383,6 +390,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
} catch (Throwable t) {
taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
+ taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
}
}
}
@@ -402,7 +410,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
- LOG.error("FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
+ LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 8929e8d..31c0efa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -1057,7 +1057,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
}
- LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d",
+ LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
subQuery.getId(),
subQuery.getTotalScheduledObjectsCount(),
subQuery.succeededObjectCount,
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 937d886..2b947fe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -33,6 +33,7 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.Query;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.rpc.BlockingRpcServer;
@@ -41,6 +42,7 @@ import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collection;
public class TajoWorkerClientService extends AbstractService {
private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
@@ -201,9 +203,12 @@ public class TajoWorkerClientService extends AbstractService {
} else {
builder.setFinishTime(System.currentTimeMillis());
}
- } else {
- builder.setState(queryMasterTask.getState());
- builder.setErrorMessage(queryMasterTask.getErrorMessage());
+ }
+ Collection<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = queryMasterTask.getDiagnostics();
+ if(!diagnostics.isEmpty()) {
+ TajoWorkerProtocol.TaskFatalErrorReport firstError = diagnostics.iterator().next();
+ builder.setErrorMessage(firstError.getErrorMessage());
+ builder.setErrorTrace(firstError.getErrorTrace());
}
}
return builder.build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 30f56ee..ef52fd0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -365,7 +365,7 @@ public class Task {
public void run() {
startTime = System.currentTimeMillis();
- String errorMessage = null;
+ Exception error = null;
try {
context.setState(TaskAttemptState.TA_RUNNING);
@@ -381,6 +381,7 @@ public class Task {
this.executor = taskRunnerContext.getTQueryEngine().
createPlan(context, plan);
this.executor.init();
+
while(!killed && executor.next() != null) {
}
this.executor.close();
@@ -388,9 +389,8 @@ public class Task {
this.executor = null;
}
} catch (Exception e) {
- // errorMessage will be sent to master.
- errorMessage = ExceptionUtils.getStackTrace(e);
- LOG.error(errorMessage);
+ error = e ;
+ LOG.error(e.getMessage(), e);
aborted = true;
} finally {
context.setProgress(1.0f);
@@ -409,8 +409,9 @@ public class Task {
TaskFatalErrorReport.Builder errorBuilder =
TaskFatalErrorReport.newBuilder()
.setId(getId().getProto());
- if (errorMessage != null) {
- errorBuilder.setErrorMessage(errorMessage);
+ if (error != null) {
+ errorBuilder.setErrorMessage(error.getMessage());
+ errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
}
masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
@@ -444,7 +445,7 @@ public class Task {
finishTime = System.currentTimeMillis();
cleanupTask();
- LOG.info("Task Counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+ LOG.info("Worker's task counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+ ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 7c94e33..78da10f 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -57,7 +57,8 @@ message TaskCompletionReport {
message TaskFatalErrorReport {
required QueryUnitAttemptIdProto id = 1;
- optional string error_message = 2;
+ optional string errorMessage = 2;
+ optional string errorTrace = 3;
}
message QueryUnitRequestProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java
new file mode 100644
index 0000000..e13eeef
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Float8Datum;
+import org.apache.tajo.datum.Int4Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.jdbc.MetaDataTuple;
+import org.apache.tajo.jdbc.TajoMetaDataResultSet;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestDefaultCliOutputFormatter {
+ @Test
+ public void testParseErrorMessage() {
+ String message = "java.sql.SQLException: ERROR: no such a table: table1";
+ assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(message));
+
+ String multiLineMessage =
+ "ERROR: java.sql.SQLException: ERROR: no such a table: table1\n" +
+ "com.google.protobuf.ServiceException: java.sql.SQLException: ERROR: no such a table: table1\n" +
+ "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:107)\n" +
+ "\tat org.apache.tajo.client.TajoClient.getTableDesc(TajoClient.java:777)\n" +
+ "\tat org.apache.tajo.cli.DescTableCommand.invoke(DescTableCommand.java:43)\n" +
+ "\tat org.apache.tajo.cli.TajoCli.executeMetaCommand(TajoCli.java:300)\n" +
+ "\tat org.apache.tajo.cli.TajoCli.executeParsedResults(TajoCli.java:280)\n" +
+ "\tat org.apache.tajo.cli.TajoCli.runShell(TajoCli.java:271)\n" +
+ "\tat org.apache.tajo.cli.TajoCli.main(TajoCli.java:420)\n" +
+ "Caused by: java.sql.SQLException: ERROR: no such a table: table1\n" +
+ "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:791)\n" +
+ "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:778)\n" +
+ "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:97)\n" +
+ "\t... 6 more";
+
+ assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(multiLineMessage));
+ }
+
+ @Test
+ public void testPrintResultInsertStatement() throws Exception {
+ TajoConf tajoConf = new TajoConf();
+ DefaultTajoCliOutputFormatter outputFormatter = new DefaultTajoCliOutputFormatter();
+ outputFormatter.init(tajoConf);
+
+ float responseTime = 10.1f;
+ long numBytes = 102;
+ long numRows = 30;
+
+ TableDesc tableDesc = new TableDesc();
+ TableStats stats = new TableStats();
+ stats.setNumBytes(102);
+ stats.setNumRows(numRows);
+ tableDesc.setStats(stats);
+
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter writer = new PrintWriter(stringWriter);
+ outputFormatter.printResult(writer, null, tableDesc, responseTime, null);
+
+ String expectedOutput = "(" + numRows + " rows, " + responseTime + " sec, " + numBytes + " B inserted)\n";
+ assertEquals(expectedOutput, stringWriter.toString());
+ }
+
+ @Test
+ public void testPrintResultSelectStatement() throws Exception {
+ TajoConf tajoConf = new TajoConf();
+ DefaultTajoCliOutputFormatter outputFormatter = new DefaultTajoCliOutputFormatter();
+ outputFormatter.init(tajoConf);
+
+ float responseTime = 10.1f;
+ long numBytes = 102;
+ long numRows = 30;
+
+ TableDesc tableDesc = new TableDesc();
+ TableStats stats = new TableStats();
+ stats.setNumBytes(102);
+ stats.setNumRows(numRows);
+ tableDesc.setStats(stats);
+
+ final List<MetaDataTuple> resultTables = new ArrayList<MetaDataTuple>();
+
+ String expectedOutput = "col1, col2, col3\n";
+ expectedOutput += "-------------------------------\n";
+
+ String prefix = "";
+ for (int i = 0; i < numRows; i++) {
+ MetaDataTuple tuple = new MetaDataTuple(3);
+
+ int index = 0;
+
+ tuple.put(index++, new TextDatum("row_" + i));
+ tuple.put(index++, new Int4Datum(i));
+ tuple.put(index++, new Float8Datum(i));
+
+ expectedOutput += prefix + "row_" + i + ", " + (new Int4Datum(i)) + ", " + (new Float8Datum(i));
+ prefix = "\n";
+ resultTables.add(tuple);
+ }
+ expectedOutput += "\n(" + numRows + " rows, " + responseTime + " sec, " + numBytes + " B selected)\n";
+
+ ResultSet resultSet = new TajoMetaDataResultSet(
+ Arrays.asList("col1", "col2", "col3"),
+ Arrays.asList(TajoDataTypes.Type.TEXT, TajoDataTypes.Type.INT4, TajoDataTypes.Type.FLOAT8),
+ resultTables);
+
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter writer = new PrintWriter(stringWriter);
+ outputFormatter.printResult(writer, null, tableDesc, responseTime, resultSet);
+
+ assertEquals(expectedOutput, stringWriter.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 604ed52..3d6989a 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -117,11 +117,12 @@ public class BlockingRpcClient extends NettyClientBase {
}
}
+ @Override
public Message callBlockingMethod(final MethodDescriptor method,
final RpcController controller,
final Message param,
final Message responsePrototype)
- throws ServiceException {
+ throws TajoServiceException {
int nextSeqId = sequence.getAndIncrement();
@@ -135,12 +136,13 @@ public class BlockingRpcClient extends NettyClientBase {
try {
return callFuture.get();
} catch (Throwable t) {
- if(t instanceof ExecutionException) {
- ExecutionException ee = (ExecutionException)t;
- throw new ServiceException(ee.getCause());
- } else {
- throw new RemoteException(t);
+ if (t instanceof ExecutionException) {
+ Throwable cause = t.getCause();
+ if (cause != null && cause instanceof TajoServiceException) {
+ throw (TajoServiceException)cause;
+ }
}
+ throw new TajoServiceException(t.getMessage());
}
}
@@ -161,14 +163,23 @@ public class BlockingRpcClient extends NettyClientBase {
private String getErrorMessage(String message) {
if(protocol != null && getChannel() != null) {
- return "Exception [" + protocol.getCanonicalName() +
+ return protocol.getName() +
"(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
- getChannel().getRemoteAddress()) + ")]: " + message;
+ getChannel().getRemoteAddress()) + "): " + message;
} else {
return "Exception " + message;
}
}
+ private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
+ if(protocol != null && getChannel() != null) {
+ return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
+ NetUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
+ } else {
+ return new TajoServiceException(response.getErrorMessage());
+ }
+ }
+
private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
@Override
@@ -183,9 +194,8 @@ public class BlockingRpcClient extends NettyClientBase {
} else {
if (rpcResponse.hasErrorMessage()) {
callback.setFailed(rpcResponse.getErrorMessage(),
- new ServiceException(getErrorMessage(rpcResponse.getErrorMessage())));
- throw new RemoteException(
- getErrorMessage(rpcResponse.getErrorMessage()));
+ makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
+ throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
} else {
Message responseMessage;
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 067d824..9e0d57c 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -125,8 +125,6 @@ public class BlockingRpcServer extends NettyServerBase {
RemoteCallException callException = (RemoteCallException) e.getCause();
e.getChannel().write(callException.getResponse());
}
-
- throw new RemoteException(e.getCause());
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 8f49e17..fa4b941 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -57,20 +57,21 @@ public abstract class NettyClientBase implements Closeable {
this.bootstrap.setOption("keepAlive", true);
connect(addr);
- } catch (Throwable t) {
+ } catch (IOException e) {
close();
+ throw e;
+ } catch (Throwable t) {
throw new IOException("Connect error to " + addr + " cause " + t.getMessage(), t.getCause());
}
}
- public void connect(InetSocketAddress addr) {
+ public void connect(InetSocketAddress addr) throws Exception {
if(addr.isUnresolved()){
addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
}
this.channelFuture = bootstrap.connect(addr);
this.channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
- channelFuture.getCause().printStackTrace();
throw new RuntimeException(channelFuture.getCause());
}
}
@@ -80,6 +81,9 @@ public abstract class NettyClientBase implements Closeable {
}
public InetSocketAddress getRemoteAddress() {
+ if (channelFuture == null || channelFuture.getChannel() == null) {
+ return null;
+ }
return (InetSocketAddress) channelFuture.getChannel().getRemoteAddress();
}
@@ -100,9 +104,9 @@ public abstract class NettyClientBase implements Closeable {
if(this.bootstrap != null) {
// This line will shutdown the factory
// this.bootstrap.releaseExternalResources();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Proxy is disconnected from " +
- getRemoteAddress().getHostName() + ":" + getRemoteAddress().getPort());
+ InetSocketAddress address = getRemoteAddress();
+ if (address != null) {
+ LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort());
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
index 6504e69..949aa58 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -27,11 +27,15 @@ import java.io.Writer;
public class RemoteCallException extends RemoteException {
private int seqId;
+ private String originExceptionClass;
public RemoteCallException(int seqId, MethodDescriptor methodDesc,
Throwable t) {
super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t);
this.seqId = seqId;
+ if (t != null) {
+ originExceptionClass = t.getClass().getCanonicalName();
+ }
}
public RemoteCallException(int seqId, Throwable t) {
@@ -42,7 +46,10 @@ public class RemoteCallException extends RemoteException {
public RpcResponse getResponse() {
RpcResponse.Builder builder = RpcResponse.newBuilder();
builder.setId(seqId);
- builder.setErrorMessage(getStackTraceString(getCause()));
+ builder.setErrorMessage(getCause().getMessage());
+ builder.setErrorTrace(getStackTraceString(getCause()));
+ builder.setErrorClass(originExceptionClass);
+
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index adafd5c..6274eff 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -96,7 +96,9 @@ public final class RpcChannelFactory {
if(LOG.isDebugEnabled()) {
LOG.debug("Shutdown Shared RPC Pool");
}
- factory.releaseExternalResources();
+ if (factory != null) {
+ factory.releaseExternalResources();
+ }
factory = null;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
new file mode 100644
index 0000000..113d181
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+public class TajoServiceException extends ServiceException {
+ private String traceMessage;
+ private String protocol;
+ private String remoteAddress;
+
+ public TajoServiceException(String message) {
+ super(message);
+ }
+ public TajoServiceException(String message, String traceMessage) {
+ super(message);
+ this.traceMessage = traceMessage;
+ }
+
+ public TajoServiceException(String message, Throwable cause, String protocol, String remoteAddress) {
+ super(message, cause);
+
+ this.protocol = protocol;
+ this.remoteAddress = remoteAddress;
+ }
+
+ public String getTraceMessage() {
+ if(traceMessage == null && getCause() != null){
+ this.traceMessage = ExceptionUtils.getStackTrace(getCause());
+ }
+ return traceMessage;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/proto/RpcProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/RpcProtos.proto b/tajo-rpc/src/main/proto/RpcProtos.proto
index 2dac58d..69f43ed 100644
--- a/tajo-rpc/src/main/proto/RpcProtos.proto
+++ b/tajo-rpc/src/main/proto/RpcProtos.proto
@@ -26,5 +26,7 @@ message RpcRequest {
message RpcResponse {
required int32 id = 1;
optional bytes response_message = 2;
- optional string error_message = 3;
+ optional string error_class = 3;
+ optional string error_message = 4;
+ optional string error_trace = 5;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index fe8685a..7acede6 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -30,6 +30,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,7 +39,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class TestBlockingRpc {
- public static String MESSAGE = "TestBlockingRpc";
+ public static final String MESSAGE = "TestBlockingRpc";
private BlockingRpcServer server;
private BlockingRpcClient client;
@@ -73,7 +75,7 @@ public class TestBlockingRpc {
.setX3(3.15d)
.setX4(2.0f).build();
SumResponse response1 = stub.sum(null, request);
- assertTrue(8.15d == response1.getResult());
+ assertEquals(8.15d, response1.getResult(), 1e-15);
EchoMessage message = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
@@ -100,7 +102,7 @@ public class TestBlockingRpc {
}
}.withRetries();
- assertTrue(8.15d == response.getResult());
+ assertEquals(8.15d, response.getResult(), 1e-15);
response =
new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2),
@@ -117,6 +119,39 @@ public class TestBlockingRpc {
}
@Test
+ public void testThrowException() throws Exception {
+ EchoMessage message = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+
+ try {
+ stub.throwException(null, message);
+ fail("RpcCall should throw exception");
+ } catch (Throwable t) {
+ assertTrue(t instanceof TajoServiceException);
+ assertEquals("Exception Test", t.getMessage());
+ TajoServiceException te = (TajoServiceException)t;
+ assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol());
+ assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(),
+ te.getRemoteAddress());
+ }
+ }
+
+ @Test
+ public void testConnectionFailed() throws Exception {
+ try {
+ int port = server.getListenAddress().getPort() + 1;
+ new BlockingRpcClient(DummyProtocol.class,
+ NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)));
+ fail("Connection should be failed.");
+ } catch (Throwable t) {
+ assertTrue(t instanceof IOException);
+ assertNotNull(t.getCause());
+ assertTrue(t.getCause() instanceof ConnectException);
+ assertTrue(t.getCause().getMessage().indexOf("Connection refused") >= 0);
+ }
+ }
+
+ @Test
public void testGetNull() throws Exception {
assertNull(stub.getNull(null, null));
assertTrue(service.getNullCalled);