You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/04 03:31:28 UTC
[3/4] tajo git commit: TAJO-1699: Tajo Java Client version 2.
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 4900188..a97cb33 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -26,6 +26,9 @@ import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.annotation.NotNull;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.client.v2.exception.ClientConnectionException;
+import org.apache.tajo.exception.NoSuchSessionVariableException;
import org.apache.tajo.exception.SQLExceptionUtil;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -36,7 +39,6 @@ import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse;
@@ -57,14 +59,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE;
-import static org.apache.tajo.exception.ReturnStateUtil.isError;
-import static org.apache.tajo.exception.ReturnStateUtil.isSuccess;
-import static org.apache.tajo.exception.ReturnStateUtil.isThisError;
-import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException;
+import static org.apache.tajo.error.Errors.ResultCode.UNDEFINED_DATABASE;
+import static org.apache.tajo.exception.ReturnStateUtil.*;
import static org.apache.tajo.exception.SQLExceptionUtil.throwIfError;
+import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException;
import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
import static org.apache.tajo.ipc.ClientProtos.CreateSessionResponse;
-import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
public class SessionConnection implements Closeable {
@@ -101,7 +101,7 @@ public class SessionConnection implements Closeable {
* @throws SQLException
*/
public SessionConnection(@NotNull ServiceTracker tracker, @Nullable String baseDatabase,
- @NotNull KeyValueSet properties) throws SQLException {
+ @NotNull KeyValueSet properties) {
this.serviceTracker = tracker;
this.baseDatabase = baseDatabase;
this.properties = properties;
@@ -117,7 +117,7 @@ public class SessionConnection implements Closeable {
return Collections.unmodifiableMap(sessionVarsCache);
}
- public synchronized NettyClientBase getTajoMasterConnection() throws SQLException {
+ public synchronized NettyClientBase getTajoMasterConnection() {
if (client != null && client.isConnected()) {
return client;
@@ -138,14 +138,14 @@ public class SessionConnection implements Closeable {
connections.incrementAndGet();
} catch (Throwable t) {
- throw SQLExceptionUtil.makeUnableToEstablishConnection(t);
+ throw new ClientConnectionException(t);
}
return client;
}
}
- protected BlockingInterface getTMStub() throws SQLException {
+ protected BlockingInterface getTMStub() {
NettyClientBase tmClient;
tmClient = getTajoMasterConnection();
BlockingInterface stub = tmClient.getStub();
@@ -185,7 +185,7 @@ public class SessionConnection implements Closeable {
return userInfo;
}
- public String getCurrentDatabase() throws SQLException {
+ public String getCurrentDatabase() {
NettyClientBase client = getTajoMasterConnection();
checkSessionAndGet(client);
@@ -198,11 +198,11 @@ public class SessionConnection implements Closeable {
throw new RuntimeException(e);
}
- throwIfError(response.getState());
+ ensureOk(response.getState());
return response.getValue();
}
- public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws SQLException {
+ public Map<String, String> updateSessionVariables(final Map<String, String> variables) {
NettyClientBase client = getTajoMasterConnection();
checkSessionAndGet(client);
@@ -221,15 +221,12 @@ public class SessionConnection implements Closeable {
throw new RuntimeException(e);
}
- if (isSuccess(response.getState())) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw toSQLException(response.getState());
- }
+ ensureOk(response.getState());
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
}
- public Map<String, String> unsetSessionVariables(final List<String> variables) throws SQLException {
+ public Map<String, String> unsetSessionVariables(final List<String> variables) throws NoSuchSessionVariableException {
final BlockingInterface stub = getTMStub();
final UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder()
@@ -244,12 +241,13 @@ public class SessionConnection implements Closeable {
throw new RuntimeException(e);
}
- if (isSuccess(response.getState())) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw toSQLException(response.getState());
+ if (isThisError(response.getState(), NO_SUCH_SESSION_VARIABLE)) {
+ throw new NoSuchSessionVariableException(response.getState());
}
+
+ ensureOk(response.getState());
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
}
void updateSessionVarsCache(Map<String, String> variables) {
@@ -259,7 +257,7 @@ public class SessionConnection implements Closeable {
}
}
- public String getSessionVariable(final String varname) throws SQLException {
+ public String getSessionVariable(final String varname) throws NoSuchSessionVariableException {
synchronized (sessionVarsCache) {
// If a desired variable is client side one and exists in the cache, immediately return the variable.
if (sessionVarsCache.containsKey(varname)) {
@@ -271,35 +269,41 @@ public class SessionConnection implements Closeable {
checkSessionAndGet(client);
BlockingInterface stub = client.getStub();
-
+ StringResponse response;
try {
- return stub.getSessionVariable(null, getSessionedString(varname)).getValue();
+ response = stub.getSessionVariable(null, getSessionedString(varname));
} catch (ServiceException e) {
throw new RuntimeException(e);
}
+
+ if (isThisError(response.getState(), NO_SUCH_SESSION_VARIABLE)) {
+ throw new NoSuchSessionVariableException(response.getState());
+ }
+
+ ensureOk(response.getState());
+ return response.getValue();
}
- public Boolean existSessionVariable(final String varname) throws SQLException {
+ public Boolean existSessionVariable(final String varname) {
+ ReturnState state;
try {
final BlockingInterface stub = getTMStub();
- ReturnState state = stub.existSessionVariable(null, getSessionedString(varname));
-
- if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) {
- return false;
- } else if (isError(state)){
- throw SQLExceptionUtil.toSQLException(state);
- }
-
- return isSuccess(state);
-
+ state = stub.existSessionVariable(null, getSessionedString(varname));
} catch (ServiceException e) {
throw new RuntimeException(e);
}
+
+ if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) {
+ return false;
+ }
+
+ ensureOk(state);
+ return true;
}
- public Map<String, String> getAllSessionVariables() throws SQLException {
+ public Map<String, String> getAllSessionVariables() {
NettyClientBase client = getTajoMasterConnection();
checkSessionAndGet(client);
@@ -311,22 +315,29 @@ public class SessionConnection implements Closeable {
throw new RuntimeException(e);
}
- throwIfError(response.getState());
+ ensureOk(response.getState());
return ProtoUtil.convertToMap(response.getValue());
}
- public Boolean selectDatabase(final String databaseName) throws SQLException {
+ public Boolean selectDatabase(final String dbName) throws UndefinedDatabaseException {
BlockingInterface stub = getTMStub();
boolean selected;
try {
- selected = isSuccess(stub.selectDatabase(null, getSessionedString(databaseName)));
+ ReturnState state = stub.selectDatabase(null, getSessionedString(dbName));
+
+ if (isThisError(state, UNDEFINED_DATABASE)) {
+ throw new UndefinedDatabaseException(dbName);
+ }
+
+ selected = ensureOk(state);
+
} catch (ServiceException e) {
throw new RuntimeException(e);
}
if (selected) {
- this.baseDatabase = databaseName;
+ this.baseDatabase = dbName;
}
return selected;
}
@@ -362,7 +373,7 @@ public class SessionConnection implements Closeable {
return serviceTracker.getClientServiceAddress();
}
- protected void checkSessionAndGet(NettyClientBase client) throws SQLException {
+ protected void checkSessionAndGet(NettyClientBase client) {
if (sessionId == null) {
@@ -390,7 +401,7 @@ public class SessionConnection implements Closeable {
LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
}
} else {
- throw SQLExceptionUtil.toSQLException(response.getState());
+ throw new InvalidClientSessionException(sessionId.getId());
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index b66d451..8c167a4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -26,8 +26,10 @@ import org.apache.tajo.annotation.ThreadSafe;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.TajoException;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
@@ -90,7 +92,7 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
// QueryClient wrappers
/*------------------------------------------------------------------------*/
- public void closeQuery(final QueryId queryId) throws SQLException {
+ public void closeQuery(final QueryId queryId) {
queryClient.closeQuery(queryId);
}
@@ -98,23 +100,23 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
queryClient.closeNonForwardQuery(queryId);
}
- public SubmitQueryResponse executeQuery(final String sql) throws SQLException {
+ public SubmitQueryResponse executeQuery(final String sql) {
return queryClient.executeQuery(sql);
}
- public SubmitQueryResponse executeQueryWithJson(final String json) throws SQLException {
+ public SubmitQueryResponse executeQueryWithJson(final String json) {
return queryClient.executeQueryWithJson(json);
}
- public ResultSet executeQueryAndGetResult(final String sql) throws SQLException {
+ public ResultSet executeQueryAndGetResult(final String sql) throws TajoException {
return queryClient.executeQueryAndGetResult(sql);
}
- public ResultSet executeJsonQueryAndGetResult(final String json) throws SQLException {
+ public ResultSet executeJsonQueryAndGetResult(final String json) throws TajoException {
return queryClient.executeJsonQueryAndGetResult(json);
}
- public QueryStatus getQueryStatus(QueryId queryId) throws SQLException {
+ public QueryStatus getQueryStatus(QueryId queryId) {
return queryClient.getQueryStatus(queryId);
}
@@ -134,15 +136,15 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
return queryClient.fetchNextQueryResult(queryId, fetchRowNum);
}
- public boolean updateQuery(final String sql) throws SQLException {
+ public boolean updateQuery(final String sql) throws TajoException {
return queryClient.updateQuery(sql);
}
- public boolean updateQueryWithJson(final String json) throws SQLException {
+ public boolean updateQueryWithJson(final String json) throws TajoException {
return queryClient.updateQueryWithJson(json);
}
- public QueryStatus killQuery(final QueryId queryId) throws SQLException {
+ public QueryStatus killQuery(final QueryId queryId) {
return queryClient.killQuery(queryId);
}
@@ -150,7 +152,7 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
return queryClient.getRunningQueryList();
}
- public List<BriefQueryInfo> getFinishedQueryList() throws SQLException {
+ public List<BriefQueryInfo> getFinishedQueryList() {
return queryClient.getFinishedQueryList();
}
@@ -178,54 +180,54 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
// CatalogClient wrappers
/*------------------------------------------------------------------------*/
- public boolean createDatabase(final String databaseName) throws SQLException {
+ public boolean createDatabase(final String databaseName) throws DuplicateDatabaseException {
return catalogClient.createDatabase(databaseName);
}
- public boolean existDatabase(final String databaseName) throws SQLException {
+ public boolean existDatabase(final String databaseName) {
return catalogClient.existDatabase(databaseName);
}
- public boolean dropDatabase(final String databaseName) throws SQLException {
+ public boolean dropDatabase(final String databaseName) throws UndefinedDatabaseException {
return catalogClient.dropDatabase(databaseName);
}
- public List<String> getAllDatabaseNames() throws SQLException {
+ public List<String> getAllDatabaseNames() {
return catalogClient.getAllDatabaseNames();
}
- public boolean existTable(final String tableName) throws SQLException {
+ public boolean existTable(final String tableName) {
return catalogClient.existTable(tableName);
}
public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path,
- final TableMeta meta) throws SQLException {
+ final TableMeta meta) throws DuplicateTableException {
return catalogClient.createExternalTable(tableName, schema, path, meta);
}
public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path,
final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
- throws SQLException {
+ throws DuplicateTableException {
return catalogClient.createExternalTable(tableName, schema, path, meta, partitionMethodDesc);
}
- public boolean dropTable(final String tableName) throws SQLException {
+ public boolean dropTable(final String tableName) throws UndefinedTableException {
return dropTable(tableName, false);
}
- public boolean dropTable(final String tableName, final boolean purge) throws SQLException {
+ public boolean dropTable(final String tableName, final boolean purge) throws UndefinedTableException {
return catalogClient.dropTable(tableName, purge);
}
- public List<String> getTableList(@Nullable final String databaseName) throws SQLException {
+ public List<String> getTableList(@Nullable final String databaseName) {
return catalogClient.getTableList(databaseName);
}
- public TableDesc getTableDesc(final String tableName) throws SQLException {
+ public TableDesc getTableDesc(final String tableName) throws UndefinedTableException {
return catalogClient.getTableDesc(tableName);
}
- public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws SQLException {
+ public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) {
return catalogClient.getFunctions(functionName);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
index 358f1a0..c79b756 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -58,7 +58,7 @@ public class TajoClientUtil {
return !isQueryWaitingForSchedule(state) && !isQueryRunning(state);
}
- public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) throws SQLException {
+ public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) {
QueryStatus status = client.getQueryStatus(queryId);
while(!isQueryComplete(status.getState())) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java
new file mode 100644
index 0000000..8dce7c4
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.exception.TajoException;
+
+import java.io.Closeable;
+import java.sql.ResultSet;
+
+/**
+ * ClientDelegate is a delegate for various wired protocols like protocol buffer, rest API, and proxy.
+ */
+public interface ClientDelegate extends Closeable {
+
+ int executeUpdate(String sql) throws TajoException;
+
+ ResultSet executeSQL(String sql) throws TajoException;
+
+ QueryFuture executeSQLAsync(String sql) throws TajoException;
+
+ String currentDB();
+
+ void selectDB(String db) throws UndefinedDatabaseException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
new file mode 100644
index 0000000..44721b3
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.client.v2.exception.ClientUnableToConnectException;
+
+import java.util.Map;
+
+public class ClientDelegateFactory {
+
+ public static ClientDelegate newDefaultDelegate(String host,
+ int port,
+ @Nullable Map<String, String> props)
+ throws ClientUnableToConnectException {
+
+ return new LegacyClientDelegate(host, port, props);
+ }
+
+ public static ClientDelegate newDefaultDelegate(ServiceDiscovery discovery,
+ @Nullable Map<String, String> props)
+ throws ClientUnableToConnectException {
+
+ return new LegacyClientDelegate(discovery, props);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java
new file mode 100644
index 0000000..b6a00e2
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+public class ClientUtil {
+
+ public static boolean isOk(QueryState state) {
+ return !(state == QueryState.ERROR || state == QueryState.FAILED);
+ }
+
+ public static boolean isFailed(QueryState state) {
+ return state == QueryState.ERROR || state == QueryState.FAILED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java
new file mode 100644
index 0000000..ac6283e
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import java.util.EventListener;
+
+public interface FutureListener<V> extends EventListener {
+ void processingCompleted(V future);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
new file mode 100644
index 0000000..a17311b
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.client.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.tajo.exception.ReturnStateUtil.ensureOk;
+
+@ThreadSafe
+public class LegacyClientDelegate extends SessionConnection implements ClientDelegate {
+
+ private QueryClientImpl queryClient;
+ private final ExecutorService executor = Executors.newFixedThreadPool(8);
+
+ public LegacyClientDelegate(String host, int port, Map<String, String> props) {
+ super(new DummyServiceTracker(NetUtils.createSocketAddr(host, port)), null, new KeyValueSet(props));
+ queryClient = new QueryClientImpl(this);
+ }
+
+ public LegacyClientDelegate(ServiceDiscovery discovery, Map<String, String> props) {
+ super(new DelegateServiceTracker(discovery), null, new KeyValueSet(props));
+ queryClient = new QueryClientImpl(this);
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws TajoException {
+ queryClient.updateQuery(sql);
+ return 0;
+ }
+
+ @Override
+ public ResultSet executeSQL(String sql) throws TajoException {
+ try {
+ return executeSQLAsync(sql).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public QueryFuture executeSQLAsync(String sql) throws TajoException {
+ ClientProtos.SubmitQueryResponse response = queryClient.executeQuery(sql);
+ ClientExceptionUtil.throwIfError(response.getState());
+
+ QueryId queryId = new QueryId(response.getQueryId());
+
+ switch (response.getResultType()) {
+ case ENCLOSED:
+ return new QueryFutureForEnclosed(queryId, TajoClientUtil.createResultSet(this.queryClient, response, 200));
+ case FETCH:
+ AsyncQueryFuture future = new AsyncQueryFuture(queryId);
+ executor.execute(future);
+ return future;
+ default:
+ return new QueryFutureForNoFetch(queryId);
+ }
+ }
+
+ @Override
+ public String currentDB() {
+ return getCurrentDatabase();
+ }
+
+ @Override
+ public void selectDB(String db) throws UndefinedDatabaseException {
+ selectDatabase(db);
+ }
+
+ private class QueryFutureForNoFetch implements QueryFuture {
+ protected final QueryId id;
+ private final long now = System.currentTimeMillis();
+
+ QueryFutureForNoFetch(QueryId id) {
+ this.id = id;
+ }
+
+ @Override
+ public String id() {
+ return id.toString();
+ }
+
+ @Override
+ public String queue() {
+ return "default";
+ }
+
+ @Override
+ public QueryState state() {
+ return QueryState.COMPLETED;
+ }
+
+ @Override
+ public float progress() {
+ return 1.0f;
+ }
+
+ @Override
+ public boolean isOk() {
+ return true;
+ }
+
+ @Override
+ public boolean isSuccessful() {
+ return true;
+ }
+
+ @Override
+ public boolean isFailed() {
+ return false;
+ }
+
+ @Override
+ public boolean isKilled() {
+ return false;
+ }
+
+ @Override
+ public UserRoleInfo user() {
+ return UserRoleInfo.getCurrentUser();
+ }
+
+ @Override
+ public void kill() {
+ }
+
+ @Override
+ public long submitTime() {
+ return 0;
+ }
+
+ @Override
+ public long startTime() {
+ return now;
+ }
+
+ @Override
+ public long finishTime() {
+ return now;
+ }
+
+ @Override
+ public void release() {
+ queryClient.closeQuery(id);
+ }
+
+ @Override
+ public void addListener(FutureListener<QueryFuture> future) {
+ future.processingCompleted(this);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ @Override
+ public ResultSet get() {
+ return TajoClientUtil.NULL_RESULT_SET;
+ }
+
+ @Override
+ public ResultSet get(long timeout, TimeUnit unit) {
+ return TajoClientUtil.NULL_RESULT_SET;
+ }
+ }
+
+ private class QueryFutureForEnclosed extends QueryFutureForNoFetch {
+ private final ResultSet resultSet;
+ QueryFutureForEnclosed(QueryId id, ResultSet resultSet) {
+ super(id);
+ this.resultSet = resultSet;
+ }
+
+ @Override
+ public ResultSet get() {
+ return resultSet;
+ }
+
+ @Override
+ public ResultSet get(long timeout, TimeUnit unit) {
+ return resultSet;
+ }
+ }
+
+ private class AsyncQueryFuture extends AbstractFuture<ResultSet> implements QueryFuture, Runnable {
+ private final QueryId queryId;
+ private volatile QueryState lastState;
+ private volatile float progress;
+ private final long submitTime = System.currentTimeMillis();
+ private volatile long startTime = 0;
+ private volatile long finishTime = 0;
+
+ public AsyncQueryFuture(QueryId queryId) {
+ this.queryId = queryId;
+ this.lastState = QueryState.SCHEDULED;
+ }
+
+ @Override
+ public String id() {
+ return queryId.toString();
+ }
+
+ @Override
+ public boolean isOk() {
+ return ClientUtil.isOk(lastState);
+ }
+
+ @Override
+ public boolean isSuccessful() {
+ return lastState == QueryState.COMPLETED;
+ }
+
+ @Override
+ public boolean isFailed() {
+ return ClientUtil.isFailed(lastState);
+ }
+
+ @Override
+ public boolean isKilled() {
+ return queryClient.getQueryStatus(queryId).getState() == TajoProtos.QueryState.QUERY_KILLED;
+ }
+
+ @Override
+ public QueryState state() {
+ return lastState;
+ }
+
+ @Override
+ public String queue() {
+ return "default";
+ }
+
+ @Override
+ public UserRoleInfo user() {
+ return UserRoleInfo.getCurrentUser();
+ }
+
+ @Override
+ public float progress() {
+ return progress;
+ }
+
+ @Override
+ public void kill() {
+ queryClient.killQuery(queryId).getState();
+ }
+
+ @Override
+ public long submitTime() {
+ return submitTime;
+ }
+
+ @Override
+ public long startTime() {
+ return startTime;
+ }
+
+ @Override
+ public long finishTime() {
+ return finishTime;
+ }
+
+ @Override
+ public void release() {
+ queryClient.closeQuery(queryId);
+ }
+
+ @Override
+ public void addListener(final FutureListener<QueryFuture> listener) {
+ final QueryFuture f = this;
+ addListener(new Runnable() {
+ @Override
+ public void run() {
+ listener.processingCompleted(f);
+ }},
+ executor);
+ }
+
+ private void updateState(GetQueryStatusResponse lastState) {
+ this.startTime = lastState.getSubmitTime();
+ this.finishTime = lastState.getFinishTime();
+ this.progress = lastState.getProgress();
+ this.lastState = convert(lastState.getQueryState());
+ }
+
+ GetQueryStatusResponse waitCompletion() {
+ GetQueryStatusResponse response = queryClient.getRawQueryStatus(queryId);
+ ensureOk(response.getState());
+ updateState(response);
+
+ while(!TajoClientUtil.isQueryComplete(response.getQueryState())) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ response = queryClient.getRawQueryStatus(queryId);
+ updateState(response);
+ ensureOk(response.getState());
+ }
+ return response;
+ }
+
+ @Override
+ public void run() {
+ GetQueryStatusResponse finalResponse;
+ try {
+ finalResponse = waitCompletion();
+ } catch (Throwable t) {
+ setException(t);
+ return;
+ }
+
+ if (finalResponse.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ if (finalResponse.hasHasResult()) {
+ set(queryClient.getQueryResult(queryId));
+ } else { // when update
+ set(TajoClientUtil.NULL_RESULT_SET);
+ }
+ } else {
+ cancel(false); // failed
+ set(TajoClientUtil.NULL_RESULT_SET);
+ }
+ }
+ }
+
+ public static class DelegateServiceTracker implements ServiceTracker {
+
+ private final ServiceDiscovery discovery;
+ DelegateServiceTracker(ServiceDiscovery discovery) {
+ this.discovery = discovery;
+ }
+
+ @Override
+ public boolean isHighAvailable() {
+ return false;
+ }
+
+ @Override
+ public InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException {
+ return null;
+ }
+
+ @Override
+ public InetSocketAddress getClientServiceAddress() throws ServiceTrackerException {
+ return discovery.clientAddress();
+ }
+
+ @Override
+ public InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public InetSocketAddress getCatalogAddress() throws ServiceTrackerException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public int getState(String masterName, TajoConf conf) throws ServiceTrackerException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public int formatHA(TajoConf conf) throws ServiceTrackerException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public List<String> getMasters(TajoConf conf) throws ServiceTrackerException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public void register() throws IOException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public void delete() throws IOException {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public boolean isActiveMaster() {
+ throw new UnimplementedException();
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ throw new UnimplementedException();
+ }
+ }
+
+ public static QueryState convert(TajoProtos.QueryState state) {
+ switch (state) {
+ case QUERY_NEW:
+ case QUERY_INIT:
+ case QUERY_NOT_ASSIGNED:
+ return QueryState.SCHEDULED;
+
+ case QUERY_MASTER_INIT:
+ case QUERY_MASTER_LAUNCHED:
+ case QUERY_RUNNING:
+ return QueryState.RUNNING;
+
+ case QUERY_KILL_WAIT:
+ return QueryState.KILLING;
+
+ case QUERY_KILLED:
+ return QueryState.KILLED;
+
+ case QUERY_FAILED:
+ return QueryState.FAILED;
+
+ case QUERY_ERROR:
+ return QueryState.ERROR;
+
+ case QUERY_SUCCEEDED:
+ return QueryState.COMPLETED;
+
+ default:
+ throw new RuntimeException("Unknown state:" + state.name());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java
new file mode 100644
index 0000000..f1604cd
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.auth.UserRoleInfo;
+
+import java.sql.ResultSet;
+import java.util.concurrent.Future;
+
+/**
+ *
+ */
+public interface QueryFuture extends Future<ResultSet> {
+ /**
+ * Get a query id
+ *
+ * @return query id
+ */
+ String id();
+
+ /**
+ * Get the queue name that the query is running
+ *
+ * @return queue name
+ */
+ String queue();
+
+ /**
+ * Get a query state
+ *
+ * @return query state
+ */
+ QueryState state();
+
+ /**
+ * Get a normalized progress (0 ~ 1.0f) of a query running
+ *
+ * @return progress
+ */
+ float progress();
+
+ /**
+ * A submitted or running query state is normal
+ *
+ * @return True if a query state is normal
+ */
+ boolean isOk();
+
+ /**
+ * Get whether the query is successfully completed or not.
+ *
+ * @return True if the query is successfully completed.
+ */
+ boolean isSuccessful();
+
+ /**
+ * Get whether the query is abort due to error.
+ *
+ * @return True if the query is abort due to error.
+ */
+ boolean isFailed();
+
+ /**
+ * Get whether the query is killed. This is equivalent to
+ * @{link Future#cancel}.
+ *
+ * @return True if the query is already killed.
+ */
+ boolean isKilled();
+
+ /**
+ * Get an user information
+ *
+ * @return UserRoleInfo
+ */
+ UserRoleInfo user();
+
+ /**
+ * Kill this query
+ */
+ void kill();
+
+ /**
+ * Get the time when a query is submitted.
+ * This time can be different from @{link QueryFuture#startTime}
+ * due to scheduling delay.
+ *
+ * @return Millisecond since epoch
+ */
+ long submitTime();
+
+ /**
+ * Get the time when a query is actually launched.
+ *
+ * @return Millisecond since epoch
+ */
+ long startTime();
+
+ /**
+ * Get the time when a query is finished.
+ *
+ * @return Millisecond since epoch
+ */
+ long finishTime();
+
+ /**
+ * Release a query future. It will be automatically released after the session invalidation.
+ */
+ void release();
+
+ /**
+ * Add a listener which will be executed after this query is completed, error, failed or killed.
+ *
+ * @param future
+ */
+ void addListener(FutureListener<QueryFuture> future);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java
new file mode 100644
index 0000000..24ea386
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+public enum QueryState {
+ /** successfully submitted */
+ SCHEDULED,
+ /** Running */
+ RUNNING,
+ /** Error before a query execution */
+ ERROR,
+ /** Failure after a query launches */
+ FAILED,
+ /** Killed */
+ KILLED,
+ /** Wait for completely kill */
+ KILLING,
+ /** Successfully completed */
+ COMPLETED
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java
new file mode 100644
index 0000000..e69ca8a
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Client service discovery interface
+ */
+public interface ServiceDiscovery {
+ InetSocketAddress clientAddress();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
new file mode 100644
index 0000000..08a921d
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.client.v2.exception.ClientUnableToConnectException;
+import org.apache.tajo.exception.TajoException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+public class TajoClient implements Closeable {
+ /**
+ * default client port number
+ */
+ public static final int DEFAULT_PORT = 26002;
+
+ private final ClientDelegate delegate;
+
+ /**
+ * Initialize TajoClient with a hostname and default port 26002.
+ *
+ * @param host hostname to connect
+ */
+ public TajoClient(String host) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, null);
+ }
+
+ /**
+ * Initialize TajoClient with a hostname and default port 26002.
+ *
+ * @param host Hostname to connect
+ * @param properties Connection properties
+ */
+ public TajoClient(String host, Map<String, String> properties) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, properties);
+ }
+
+ /**
+ * Initialize TajoClient with a hostname and port
+ *
+ * @param host Hostname to connect
+ * @param port Port number to connect
+ */
+ public TajoClient(String host, int port) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(host, port, null);
+ }
+
+ /**
+ * Initialize TajoClient with a hostname and port
+ *
+ * @param host Hostname to connect
+ * @param port Port number to connect
+ * @param properties Connection properties
+ */
+ public TajoClient(String host, int port, Map<String, String> properties) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(host, port, properties);
+ }
+
+ /**
+ * Initialize TajoClient via service discovery protocol
+ *
+ * @param discovery Service discovery
+ */
+ public TajoClient(ServiceDiscovery discovery) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(discovery, null);
+ }
+
+ /**
+ * Initialize TajoClient via service discovery protocol
+ *
+ * @param discovery Service discovery
+ * @param properties Connection properties
+ */
+ public TajoClient(ServiceDiscovery discovery, Map<String, String> properties) throws ClientUnableToConnectException {
+ delegate = ClientDelegateFactory.newDefaultDelegate(discovery, properties);
+ }
+
+ /**
+ * Submit and executes the given SQL statement, which may be an <code>INSERT (INTO)</code>,
+ * or <code>CREATE TABLE AS SELECT</code> statement or anSQL statement that returns nothing,
+ * such as an SQL DDL statement.
+ *
+ * @param sql a SQL statement
+ * @return inserted row number
+ * @throws TajoException
+ */
+ public int executeUpdate(String sql) throws TajoException {
+ return delegate.executeUpdate(sql);
+ }
+
+ /**
+ * Submit a SQL query statement
+ *
+ * @param sql a SQL statement
+ * @return QueryHandler
+ * @throws TajoException
+ */
+ public ResultSet executeQuery(String sql) throws TajoException {
+ return delegate.executeSQL(sql);
+ }
+
+ /**
+ * Execute a SQL statement through asynchronous API
+ *
+ * @param sql
+ * @return
+ * @throws TajoException
+ */
+ public QueryFuture executeQueryAsync(String sql) throws TajoException {
+ return delegate.executeSQLAsync(sql);
+ }
+
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ /**
+ * Select working database
+ *
+ * @param database Database name
+ * @throws UndefinedDatabaseException
+ */
+ public void selectDB(String database) throws UndefinedDatabaseException {
+ delegate.selectDB(database);
+ }
+
+ /**
+ * Get the current working database
+ *
+ * @return Current working database
+ */
+ public String currentDB() {
+ return delegate.currentDB();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java
new file mode 100644
index 0000000..a7fb08a
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2.exception;
+
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.exception.TajoRuntimeException;
+
+public class ClientConnectionException extends TajoRuntimeException {
+ public ClientConnectionException(Throwable t) {
+ super(Errors.ResultCode.CLIENT_CONNECTION_EXCEPTION, t.getMessage());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java
new file mode 100644
index 0000000..e567d7d
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2.exception;
+
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.exception.TajoException;
+
+public class ClientUnableToConnectException extends TajoException {
+ public ClientUnableToConnectException() {
+ super(Errors.ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
index 869d7c4..8f15710 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -87,6 +87,6 @@ public class FetchResultSet extends TajoResultSetBase {
currentResultSet.close();
currentResultSet = null;
}
- tajoClient.closeNonForwardQuery(queryId);
+ tajoClient.closeQuery(queryId);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java b/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
index 7cf6e1e..574dc3b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
@@ -18,14 +18,17 @@
package org.apache.tajo.exception;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
+
import static org.apache.tajo.error.Errors.ResultCode.AMBIGUOUS_COLUMN;
public class AmbiguousColumnException extends TajoException {
private static final long serialVersionUID = 3102675985226352347L;
- /**
- * @param fieldName
- */
+ public AmbiguousColumnException(ReturnState state) {
+ super(state);
+ }
+
public AmbiguousColumnException(String fieldName) {
super(AMBIGUOUS_COLUMN, fieldName);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
index d50164d..3b646e1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
@@ -21,6 +21,7 @@ package org.apache.tajo.exception;
import com.google.common.collect.Maps;
import org.apache.tajo.error.Errors.ResultCode;
import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.StringUtils;
import java.util.Map;
@@ -99,6 +100,9 @@ public class ErrorMessages {
ADD_MESSAGE(AMBIGUOUS_PARTITION_DIRECTORY, "There is a directory which is assumed to be a partitioned directory" +
" : '%s'", 1);
+
+ ADD_MESSAGE(CLIENT_CONNECTION_EXCEPTION, "Client connection to '%s' has error: %s", 2);
+ ADD_MESSAGE(CLIENT_UNABLE_TO_ESTABLISH_CONNECTION, "Client is unable to establish connection to '%s'", 1);
}
private static void ADD_MESSAGE(ResultCode code, String msgFormat) {
@@ -138,7 +142,7 @@ public class ErrorMessages {
}
} else {
- throw new TajoRuntimeException(code, args);
+ throw new TajoInternalError("Argument mismatch: code=" + code.name() + ", args=" + StringUtils.join(args));
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java b/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..c9ed78d
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.exception;
+
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+public class NoSuchSessionVariableException extends TajoException {
+
+ public NoSuchSessionVariableException(PrimitiveProtos.ReturnState state) {
+ super(state);
+ }
+
+ public NoSuchSessionVariableException(String variableName) {
+ super(Errors.ResultCode.NO_SUCH_SESSION_VARIABLE, variableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
index fb6b9a5..81554c4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
@@ -45,10 +45,11 @@ public class ReturnStateUtil {
OK = builder.build();
}
- public static void ensureOk(ReturnState state) {
+ public static boolean ensureOk(ReturnState state) {
if (isError(state)) {
throw new TajoRuntimeException(state);
}
+ return true;
}
public static StringListResponse returnStringList(Collection<String> values) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
index 10b5aff..deeb7f9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
@@ -19,6 +19,7 @@
package org.apache.tajo.exception;
import com.google.common.collect.Maps;
+import org.apache.log4j.spi.ErrorCode;
import org.apache.tajo.error.Errors.ResultCode;
import org.apache.tajo.exception.ErrorMessages;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
@@ -54,27 +55,34 @@ public class SQLExceptionUtil {
}
}
- public static SQLException toSQLException(ReturnState state) throws SQLException {
-
- if (SQLSTATES.containsKey(state.getReturnCode())) {
+ private static SQLException toSQLException(ResultCode code, String message) throws SQLException {
+ if (SQLSTATES.containsKey(code)) {
return new SQLException(
- state.getMessage(),
- SQLSTATES.get(state.getReturnCode()),
- state.getReturnCode().getNumber()
+ message,
+ SQLSTATES.get(code),
+ code.getNumber()
);
} else {
// If there is no SQLState corresponding to error code,
// It will make SQLState '42000' (Syntax Error Or Access Rule Violation).
return new SQLException(
- state.getMessage(),
+ message,
"42000",
ResultCode.SYNTAX_ERROR_OR_ACCESS_RULE_VIOLATION_VALUE
);
}
}
+ public static SQLException toSQLException(TajoException e) throws SQLException {
+ return toSQLException(e.getErrorCode(), e.getMessage());
+ }
+
+ public static SQLException toSQLException(ReturnState state) throws SQLException {
+ return toSQLException(state.getReturnCode(), state.getMessage());
+ }
+
public static SQLException makeSQLException(ResultCode code, String ...args) {
if (SQLSTATES.containsKey(code)) {
return new SQLException(
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
index dbb2748..765ead3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
@@ -19,6 +19,8 @@
package org.apache.tajo.exception;
import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
/**
* Unrecoverable errors
@@ -26,6 +28,11 @@ import org.apache.tajo.error.Errors.ResultCode;
public class TajoError extends Error implements TajoExceptionInterface {
private ResultCode code;
+ public TajoError(ReturnState state) {
+ super(state.getMessage());
+ code = state.getReturnCode();
+ }
+
public TajoError(ResultCode code) {
super(ErrorMessages.getMessage(code));
this.code = code;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
index 781d1a0..e0e2ccb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
@@ -19,6 +19,7 @@
package org.apache.tajo.exception;
import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
/**
* TajoException contains all exceptions with any exact reason.
@@ -27,6 +28,16 @@ import org.apache.tajo.error.Errors.ResultCode;
public class TajoException extends Exception implements TajoExceptionInterface {
private ResultCode code;
+ public TajoException(ReturnState e) {
+ super(e.getMessage());
+ this.code = e.getReturnCode();
+ }
+
+ public TajoException(TajoRuntimeException e) {
+ super(e.getMessage());
+ this.code = e.getErrorCode();
+ }
+
public TajoException(ResultCode code) {
super(ErrorMessages.getMessage(code));
this.code = code;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
index 767c13c..072636b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
@@ -19,12 +19,18 @@
package org.apache.tajo.exception;
import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
/**
* Exception for Internal Bugs and Unexpected exception
*/
public class TajoInternalError extends TajoError {
+ public TajoInternalError(ReturnState state) {
+ super(state);
+ }
+
public TajoInternalError(String message) {
super(ResultCode.INTERNAL_ERROR, message);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
index 9d91946..70586c9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
@@ -19,9 +19,14 @@
package org.apache.tajo.exception;
import org.apache.tajo.error.Errors;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
public class UndefinedOperatorException extends TajoException {
+ public UndefinedOperatorException(ReturnState state) {
+ super(state);
+ }
+
public UndefinedOperatorException(String operation) {
super(Errors.ResultCode.UNDEFINED_OPERATOR, operation);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
index 9ca5539..a7a3915 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
@@ -19,10 +19,15 @@
package org.apache.tajo.exception;
import org.apache.tajo.error.Errors;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
public class UnsupportedException extends TajoRuntimeException {
private static final long serialVersionUID = 6702291354858193578L;
+ public UnsupportedException(ReturnState state) {
+ super(state);
+ }
+
public UnsupportedException(String featureName) {
super(Errors.ResultCode.FEATURE_NOT_SUPPORTED, featureName);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 7489503..60e02a6 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -54,8 +54,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index b4a28db..90c95a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -28,6 +28,7 @@ import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.TajoException;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.FileUtil;
@@ -91,7 +92,7 @@ public abstract class BenchmarkSet {
public abstract void loadQueries() throws IOException;
- public abstract void loadTables() throws SQLException;
+ public abstract void loadTables() throws TajoException;
public String [] getTableNames() {
return schemas.keySet().toArray(new String[schemas.size()]);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 91a3b66..9739767 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.exception.TajoException;
import org.apache.tajo.storage.StorageConstants;
import java.io.File;
@@ -192,7 +193,7 @@ public class TPCH extends BenchmarkSet {
loadQueries(BENCHMARK_DIR);
}
- public void loadTables() throws SQLException {
+ public void loadTables() throws TajoException {
loadTable(LINEITEM);
loadTable(CUSTOMER);
loadTable(CUSTOMER_PARTS);
@@ -206,7 +207,7 @@ public class TPCH extends BenchmarkSet {
}
- public void loadTable(String tableName) throws SQLException {
+ public void loadTable(String tableName) throws TajoException {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
meta.putOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 67f782a..07445a4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -29,6 +29,9 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.AmbiguousFunctionException;
+import org.apache.tajo.catalog.exception.CatalogException;
+import org.apache.tajo.catalog.exception.UndefinedFunctionException;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
@@ -37,10 +40,7 @@ import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.exception.*;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.Target;
@@ -346,31 +346,32 @@ public class GlobalPlanner {
}
}
- private AggregationFunctionCallEval createSumFunction(EvalNode[] args) throws InternalException {
- FunctionDesc functionDesc = getCatalog().getFunction("sum", CatalogProtos.FunctionType.AGGREGATION,
+ private AggregationFunctionCallEval createSumFunction(EvalNode[] args) throws CatalogException {
+ FunctionDesc functionDesc = null;
+ functionDesc = getCatalog().getFunction("sum", CatalogProtos.FunctionType.AGGREGATION,
args[0].getValueType());
return new AggregationFunctionCallEval(functionDesc, args);
}
- private AggregationFunctionCallEval createCountFunction(EvalNode [] args) throws InternalException {
+ private AggregationFunctionCallEval createCountFunction(EvalNode [] args) throws CatalogException {
FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION,
args[0].getValueType());
return new AggregationFunctionCallEval(functionDesc, args);
}
- private AggregationFunctionCallEval createCountRowFunction(EvalNode[] args) throws InternalException {
+ private AggregationFunctionCallEval createCountRowFunction(EvalNode[] args) throws CatalogException {
FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION,
new TajoDataTypes.DataType[]{});
return new AggregationFunctionCallEval(functionDesc, args);
}
- private AggregationFunctionCallEval createMaxFunction(EvalNode [] args) throws InternalException {
+ private AggregationFunctionCallEval createMaxFunction(EvalNode [] args) throws CatalogException {
FunctionDesc functionDesc = getCatalog().getFunction("max", CatalogProtos.FunctionType.AGGREGATION,
args[0].getValueType());
return new AggregationFunctionCallEval(functionDesc, args);
}
- private AggregationFunctionCallEval createMinFunction(EvalNode [] args) throws InternalException {
+ private AggregationFunctionCallEval createMinFunction(EvalNode [] args) throws CatalogException {
FunctionDesc functionDesc = getCatalog().getFunction("min", CatalogProtos.FunctionType.AGGREGATION,
args[0].getValueType());
return new AggregationFunctionCallEval(functionDesc, args);
@@ -428,57 +429,53 @@ public class GlobalPlanner {
*/
private RewrittenFunctions rewriteAggFunctionsForDistinctAggregation(GlobalPlanContext context,
AggregationFunctionCallEval function)
- throws PlanningException {
+ throws TajoException {
LogicalPlan plan = context.plan.getLogicalPlan();
RewrittenFunctions rewritten = null;
- try {
- if (function.getName().equalsIgnoreCase("count")) {
- rewritten = new RewrittenFunctions(1);
-
- if (function.getArgs().length == 0) {
- rewritten.firstStageEvals[0] = createCountRowFunction(function.getArgs());
- } else {
- rewritten.firstStageEvals[0] = createCountFunction(function.getArgs());
- }
- String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
- FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
- rewritten.firstStageTargets[0] = new Target(fieldEval);
- rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval});
- } else if (function.getName().equalsIgnoreCase("sum")) {
- rewritten = new RewrittenFunctions(1);
-
- rewritten.firstStageEvals[0] = createSumFunction(function.getArgs());
- String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
- FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
- rewritten.firstStageTargets[0] = new Target(fieldEval);
- rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval});
-
- } else if (function.getName().equals("max")) {
- rewritten = new RewrittenFunctions(1);
-
- rewritten.firstStageEvals[0] = createMaxFunction(function.getArgs());
- String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
- FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
- rewritten.firstStageTargets[0] = new Target(fieldEval);
- rewritten.secondStageEvals = createMaxFunction(new EvalNode[]{fieldEval});
-
- } else if (function.getName().equals("min")) {
-
- rewritten = new RewrittenFunctions(1);
-
- rewritten.firstStageEvals[0] = createMinFunction(function.getArgs());
- String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
- FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
- rewritten.firstStageTargets[0] = new Target(fieldEval);
- rewritten.secondStageEvals = createMinFunction(new EvalNode[]{fieldEval});
+ if (function.getName().equalsIgnoreCase("count")) {
+ rewritten = new RewrittenFunctions(1);
+ if (function.getArgs().length == 0) {
+ rewritten.firstStageEvals[0] = createCountRowFunction(function.getArgs());
} else {
- throw new PlanningException("Cannot support a mix of other functions");
+ rewritten.firstStageEvals[0] = createCountFunction(function.getArgs());
}
- } catch (InternalException e) {
- LOG.error(e, e);
+ String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+ FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+ rewritten.firstStageTargets[0] = new Target(fieldEval);
+ rewritten.secondStageEvals = createSumFunction(new EvalNode[]{fieldEval});
+ } else if (function.getName().equalsIgnoreCase("sum")) {
+ rewritten = new RewrittenFunctions(1);
+
+ rewritten.firstStageEvals[0] = createSumFunction(function.getArgs());
+ String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+ FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+ rewritten.firstStageTargets[0] = new Target(fieldEval);
+ rewritten.secondStageEvals = createSumFunction(new EvalNode[]{fieldEval});
+
+ } else if (function.getName().equals("max")) {
+ rewritten = new RewrittenFunctions(1);
+
+ rewritten.firstStageEvals[0] = createMaxFunction(function.getArgs());
+ String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+ FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+ rewritten.firstStageTargets[0] = new Target(fieldEval);
+ rewritten.secondStageEvals = createMaxFunction(new EvalNode[]{fieldEval});
+
+ } else if (function.getName().equals("min")) {
+
+ rewritten = new RewrittenFunctions(1);
+
+ rewritten.firstStageEvals[0] = createMinFunction(function.getArgs());
+ String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+ FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+ rewritten.firstStageTargets[0] = new Target(fieldEval);
+ rewritten.secondStageEvals = createMinFunction(new EvalNode[]{fieldEval});
+
+ } else {
+ throw new UnsupportedException("Cannot support a mix of other functions");
}
return rewritten;
@@ -523,7 +520,7 @@ public class GlobalPlanner {
*/
private ExecutionBlock buildGroupByIncludingDistinctFunctionsMultiStage(GlobalPlanContext context,
ExecutionBlock latestExecBlock,
- GroupbyNode groupbyNode) throws PlanningException {
+ GroupbyNode groupbyNode) throws TajoException {
Column [] originalGroupingColumns = groupbyNode.getGroupingColumns();
LinkedHashSet<Column> firstStageGroupingColumns =
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index b70a79f..d16f7d1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -38,6 +38,7 @@ import org.apache.tajo.catalog.CatalogServer;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.LocalCatalogWrapper;
+import org.apache.tajo.catalog.exception.DuplicateDatabaseException;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.FunctionLoader;
@@ -369,7 +370,7 @@ public class TajoMaster extends CompositeService {
}
}
- private void checkBaseTBSpaceAndDatabase() throws IOException {
+ private void checkBaseTBSpaceAndDatabase() throws IOException, DuplicateDatabaseException {
if (!catalog.existTablespace(DEFAULT_TABLESPACE_NAME)) {
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, context.getConf().getVar(ConfVars.WAREHOUSE_DIR));
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
index 40ebf4e..a43b95e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
@@ -28,6 +28,7 @@ import org.apache.tajo.catalog.exception.DuplicateTableException;
import org.apache.tajo.catalog.exception.UndefinedTablespaceException;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.plan.logical.CreateTableNode;
@@ -52,7 +53,7 @@ public class CreateTableExecutor {
}
public TableDesc create(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists)
- throws IOException {
+ throws IOException, TajoException {
TableMeta meta;
if (createTable.hasOptions()) {
@@ -85,7 +86,7 @@ public class CreateTableExecutor {
@Nullable URI uri,
boolean isExternal,
@Nullable PartitionMethodDesc partitionDesc,
- boolean ifNotExists) throws IOException {
+ boolean ifNotExists) throws IOException, TajoException {
Pair<String, String> separatedNames = getQualifiedName(queryContext.getCurrentDatabase(), tableName);
String databaseName = separatedNames.getFirst();
@@ -119,7 +120,7 @@ public class CreateTableExecutor {
}
}
- private TableDesc handlExistence(boolean ifNotExists, String qualifiedName) {
+ private TableDesc handlExistence(boolean ifNotExists, String qualifiedName) throws DuplicateTableException {
if (ifNotExists) {
LOG.info("relation \"" + qualifiedName + "\" is already exists.");
return catalog.getTableDesc(qualifiedName);
@@ -131,13 +132,15 @@ public class CreateTableExecutor {
private Pair<String, String> getQualifiedName(String currentDatabase, String tableName) {
if (CatalogUtil.isFQTableName(tableName)) {
String [] splitted = CatalogUtil.splitFQTableName(tableName);
- return new Pair<String, String>(splitted[0], splitted[1]);
+ return new Pair<>(splitted[0], splitted[1]);
} else {
- return new Pair<String, String>(currentDatabase, tableName);
+ return new Pair<>(currentDatabase, tableName);
}
}
- private Tablespace getTablespaceHandler(@Nullable String tableSpaceName, @Nullable URI tableUri) {
+ private Tablespace getTablespaceHandler(@Nullable String tableSpaceName, @Nullable URI tableUri)
+ throws UndefinedTablespaceException {
+
if (tableSpaceName != null) {
Optional<Tablespace> ts = (Optional<Tablespace>) TablespaceManager.getByName(tableSpaceName);
if (ts.isPresent()) {