You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/23 06:47:39 UTC
tajo git commit: TAJO-751: JDBC driver should support cancel() method.
Repository: tajo
Updated Branches:
refs/heads/branch-0.10.2 3f3f3b7e6 -> 9accd2e49
TAJO-751: JDBC driver should support cancel() method.
Signed-off-by: Jihoon Son <ji...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9accd2e4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9accd2e4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9accd2e4
Branch: refs/heads/branch-0.10.2
Commit: 9accd2e49cc6b778ce96a146be4c78ea2fc5f4d3
Parents: 3f3f3b7
Author: navis.ryu <na...@apache.org>
Authored: Tue Jun 23 13:46:34 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Jun 23 13:46:34 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../org/apache/tajo/client/QueryClientImpl.java | 12 +-
.../apache/tajo/client/SessionConnection.java | 14 +-
.../java/org/apache/tajo/client/TajoClient.java | 3 +
.../org/apache/tajo/client/TajoClientUtil.java | 21 +-
.../org/apache/tajo/jdbc/FetchResultSet.java | 15 +-
.../apache/tajo/jdbc/TajoMemoryResultSet.java | 10 +-
.../org/apache/tajo/jdbc/TajoResultSetBase.java | 19 +-
.../org/apache/tajo/jdbc/WaitingResultSet.java | 71 +++++
.../java/org/apache/tajo/OverridableConf.java | 4 +
.../src/main/java/org/apache/tajo/QueryId.java | 4 +
.../main/java/org/apache/tajo/SessionVars.java | 4 +-
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../java/org/apache/tajo/util/KeyValueSet.java | 59 +++-
.../org/apache/tajo/master/QueryInProgress.java | 10 +-
.../tajo/master/TajoMasterClientService.java | 6 +
.../apache/tajo/querymaster/Repartitioner.java | 4 +-
.../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 37 ++-
.../TestTajoCli/testHelpSessionVars.result | 1 +
.../org/apache/tajo/jdbc/JdbcConnection.java | 6 +-
.../apache/tajo/jdbc/TajoMetaDataResultSet.java | 9 +-
.../apache/tajo/jdbc/TajoPreparedStatement.java | 276 ++-----------------
.../org/apache/tajo/jdbc/TajoStatement.java | 199 ++++++++-----
23 files changed, 394 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index dc9e32e..e13413a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -6,6 +6,9 @@ Release 0.10.2 - Released
IMPROVEMENT
+ TAJO-751: JDBC driver should support cancel() method.
+ (Contributed by navis, Committed by jihoon)
+
BUG FIXES
TASKS
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 99c58b6..75d2d3b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -271,17 +271,7 @@ public class QueryClientImpl implements QueryClient {
return createNullResultSet(queryId);
}
- QueryStatus status = getQueryStatus(queryId);
-
- while(status != null && !TajoClientUtil.isQueryComplete(status.getState())) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- status = getQueryStatus(queryId);
- }
+ QueryStatus status = TajoClientUtil.waitCompletion(this, queryId);
if (status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
if (status.hasResult()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 187af33..63edc6b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.annotation.NotNull;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.auth.UserRoleInfo;
import org.apache.tajo.ipc.ClientProtos;
@@ -68,14 +69,14 @@ public class SessionConnection implements Closeable {
volatile TajoIdProtos.SessionIdProto sessionId;
- private AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/** session variable cache */
private final Map<String, String> sessionVarsCache = new HashMap<String, String>();
- private ServiceTracker serviceTracker;
+ private final ServiceTracker serviceTracker;
- private KeyValueSet properties;
+ private final KeyValueSet properties;
/**
* Connect to TajoMaster
@@ -88,14 +89,13 @@ public class SessionConnection implements Closeable {
*/
public SessionConnection(ServiceTracker tracker, @Nullable String baseDatabase,
KeyValueSet properties) throws IOException {
-
+ this.serviceTracker = tracker;
+ this.baseDatabase = baseDatabase;
this.properties = properties;
this.manager = RpcClientManager.getInstance();
this.userInfo = UserRoleInfo.getCurrentUser();
- this.baseDatabase = baseDatabase != null ? baseDatabase : null;
- this.serviceTracker = tracker;
connections.incrementAndGet();
}
@@ -113,7 +113,7 @@ public class SessionConnection implements Closeable {
return manager.getClient(addr, protocolClass, asyncMode);
}
- protected KeyValueSet getProperties() {
+ public KeyValueSet getProperties() {
return properties;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 376f63f..85929e8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -19,9 +19,12 @@
package org.apache.tajo.client;
import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.util.KeyValueSet;
import java.io.Closeable;
@ThreadSafe
public interface TajoClient extends QueryClient, CatalogAdminClient, Closeable {
+
+ KeyValueSet getProperties();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
index ea15aed..bab699e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -18,7 +18,9 @@
package org.apache.tajo.client;
+import com.google.protobuf.ServiceException;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.CatalogUtil;
@@ -27,6 +29,8 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.jdbc.FetchResultSet;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSetBase;
+import org.apache.tajo.rpc.RpcUtils;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import java.io.IOException;
@@ -56,6 +60,21 @@ public class TajoClientUtil {
return !isQueryWaitingForSchedule(state) && !isQueryRunning(state);
}
+ public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) throws ServiceException {
+ QueryStatus status = client.getQueryStatus(queryId);
+
+ while(!isQueryComplete(status.getState())) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ status = client.getQueryStatus(queryId);
+ }
+ return status;
+ }
+
public static ResultSet createResultSet(TajoClient client, QueryId queryId,
ClientProtos.GetQueryResultResponse response, int fetchRows)
throws IOException {
@@ -91,7 +110,7 @@ public class TajoClientUtil {
}
public static ResultSet createNullResultSet() {
- return new TajoMemoryResultSet(null, new Schema(), null, 0, null);
+ return new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), null, 0, null);
}
public static ResultSet createNullResultSet(QueryId queryId) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
index efe070e..869d7c4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -27,26 +27,19 @@ import java.io.IOException;
import java.sql.SQLException;
public class FetchResultSet extends TajoResultSetBase {
- private QueryClient tajoClient;
- private QueryId queryId;
+ protected QueryClient tajoClient;
private int fetchRowNum;
private TajoMemoryResultSet currentResultSet;
- private boolean finished = false;
-// maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit.
+ private boolean finished;
+ // maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit.
private int maxRows;
public FetchResultSet(QueryClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
- super(tajoClient.getClientSideSessionVars());
+ super(queryId, schema, tajoClient.getClientSideSessionVars());
this.tajoClient = tajoClient;
this.maxRows = tajoClient.getMaxRows();
- this.queryId = queryId;
this.fetchRowNum = fetchRowNum;
this.totalRow = Integer.MAX_VALUE;
- this.schema = schema;
- }
-
- public QueryId getQueryId() {
- return queryId;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
index 33cb838..4114d03 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
@@ -31,20 +31,16 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class TajoMemoryResultSet extends TajoResultSetBase {
- private QueryId queryId;
private List<ByteString> serializedTuples;
private AtomicBoolean closed = new AtomicBoolean(false);
private RowStoreUtil.RowStoreDecoder decoder;
public TajoMemoryResultSet(QueryId queryId, Schema schema, List<ByteString> serializedTuples, int maxRowNum,
Map<String, String> clientSideSessionVars) {
- super(clientSideSessionVars);
- this.queryId = queryId;
- this.schema = schema;
+ super(queryId, schema, clientSideSessionVars);
this.totalRow = maxRowNum;
this.serializedTuples = serializedTuples;
this.decoder = RowStoreUtil.createDecoder(schema);
- init();
}
@Override
@@ -53,10 +49,6 @@ public class TajoMemoryResultSet extends TajoResultSetBase {
curRow = 0;
}
- public QueryId getQueryId() {
- return queryId;
- }
-
@Override
public synchronized void close() throws SQLException {
if (closed.getAndSet(true)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
index 77cbbf2..684d7ba 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
@@ -18,6 +18,7 @@
package org.apache.tajo.jdbc;
+import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
@@ -47,7 +48,11 @@ public abstract class TajoResultSetBase implements ResultSet {
protected Schema schema;
protected Tuple cur;
- public TajoResultSetBase(@Nullable Map<String, String> clientSideSessionVars) {
+ protected final QueryId queryId;
+
+ public TajoResultSetBase(QueryId queryId, Schema schema, @Nullable Map<String, String> clientSideSessionVars) {
+ this.queryId = queryId;
+ this.schema = schema;
this.clientSideSessionVars = clientSideSessionVars;
if (clientSideSessionVars != null) {
@@ -73,6 +78,14 @@ public abstract class TajoResultSetBase implements ResultSet {
wasNull = (d instanceof NullDatum);
}
+ protected Schema getSchema() throws SQLException {
+ return schema;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
public Tuple getCurrentTuple() {
return cur;
}
@@ -477,7 +490,7 @@ public abstract class TajoResultSetBase implements ResultSet {
@Override
public int findColumn(String colName) throws SQLException {
- return schema.getColumnIdByName(colName);
+ return getSchema().getColumnIdByName(colName);
}
@Override
@@ -593,7 +606,7 @@ public abstract class TajoResultSetBase implements ResultSet {
@Override
public ResultSetMetaData getMetaData() throws SQLException {
- return new TajoResultSetMetaData(schema);
+ return new TajoResultSetMetaData(getSchema());
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java
new file mode 100644
index 0000000..b9f8df5
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryClient;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.ipc.ClientProtos;
+
+import java.sql.SQLException;
+
+/**
+ * Blocks on schema retrieval if it's not ready
+ */
+public class WaitingResultSet extends FetchResultSet {
+
+ public WaitingResultSet(QueryClient tajoClient, QueryId queryId, int fetchRowNum)
+ throws SQLException {
+ super(tajoClient, null, queryId, fetchRowNum);
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ getSchema();
+ return super.next();
+ }
+
+ @Override
+ protected Schema getSchema() throws SQLException {
+ return schema == null ? schema = waitOnResult() : schema;
+ }
+
+ private Schema waitOnResult() throws SQLException {
+ try {
+ QueryStatus status = TajoClientUtil.waitCompletion(tajoClient, queryId);
+
+ if (status.getState() != TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ throw new ServiceException(status.getErrorMessage() != null ? status.getErrorMessage() :
+ status.getErrorTrace() != null ? status.getErrorTrace() :
+ "Failed to execute query by unknown reason");
+ }
+ ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(queryId);
+ TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc());
+ return tableDesc.getLogicalSchema();
+ } catch (ServiceException e) {
+ throw new SQLException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
index 61bbb5a..c22f054 100644
--- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
@@ -98,6 +98,7 @@ public class OverridableConf extends KeyValueSet {
}
}
+ @Override
public boolean getBool(ConfigKey key) {
return getBool(key, null);
}
@@ -123,6 +124,7 @@ public class OverridableConf extends KeyValueSet {
}
}
+ @Override
public int getInt(ConfigKey key) {
return getInt(key, null);
}
@@ -148,6 +150,7 @@ public class OverridableConf extends KeyValueSet {
}
}
+ @Override
public long getLong(ConfigKey key) {
return getLong(key, null);
}
@@ -173,6 +176,7 @@ public class OverridableConf extends KeyValueSet {
}
}
+ @Override
public float getFloat(ConfigKey key) {
return getLong(key, null);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/QueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
index 85882c1..35ca75c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
@@ -44,6 +44,10 @@ public class QueryId implements Comparable<QueryId> {
return seq;
}
+ public boolean isNull() {
+ return this.equals(QueryIdFactory.NULL_QUERY_ID);
+ }
+
@Override
public String toString() {
return QUERY_ID_PREFIX + SEPARATOR + toStringNoPrefix();
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 15ee73a..4882b27 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -102,7 +102,7 @@ public enum SessionVars implements ConfigKey {
GROUPBY_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME, "shuffle output size for sort (mb)", DEFAULT,
Integer.class, Validators.min("1")),
TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME,
- "shuffle output size for partition table write (mb)", DEFAULT, Long.class, Validators.min("1")),
+ "shuffle output size for partition table write (mb)", DEFAULT, Integer.class, Validators.min("1")),
GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT,
Boolean.class, Validators.bool()),
@@ -130,6 +130,8 @@ public enum SessionVars implements ConfigKey {
// ResultSet ----------------------------------------------------------------
FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT,
Integer.class, Validators.min("0")),
+ BLOCK_ON_RESULT(ConfVars.$RESULT_SET_BLOCK_WAIT, "Whether to block result set on query execution", DEFAULT,
+ Boolean.class, Validators.bool()),
//-------------------------------------------------------------------------------
// Only for Unit Testing
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1cc1240..95ef4bc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -361,6 +361,7 @@ public class TajoConf extends Configuration {
// ResultSet ---------------------------------------------------------
$RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200),
+ $RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true),
;
public final String varname;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 6af0c9e..392552d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -21,7 +21,10 @@ package org.apache.tajo.util;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
+import org.apache.tajo.ConfigKey;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.json.CommonGsonHelper;
import org.apache.tajo.json.GsonObject;
@@ -37,7 +40,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
public static final String FALSE_STR = "false";
@Expose private Map<String,String> keyVals;
-
+
public KeyValueSet() {
keyVals = TUtil.newHashMap();
}
@@ -46,23 +49,23 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
this();
putAll(keyVals);
}
-
+
public KeyValueSet(KeyValueSetProto proto) {
this.keyVals = TUtil.newHashMap();
for(KeyValueProto keyval : proto.getKeyvalList()) {
this.keyVals.put(keyval.getKey(), keyval.getValue());
}
}
-
+
public KeyValueSet(KeyValueSet keyValueSet) {
this();
this.keyVals.putAll(keyValueSet.keyVals);
}
-
+
public static KeyValueSet create() {
return new KeyValueSet();
}
-
+
public static KeyValueSet create(KeyValueSet keyValueSet) {
return new KeyValueSet(keyValueSet);
}
@@ -119,7 +122,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
public boolean getBool(String key, Boolean defaultVal) {
if (containsKey(key)) {
String strVal = get(key, null);
- return strVal != null ? strVal.equalsIgnoreCase(TRUE_STR) : false;
+ return strVal != null && strVal.equalsIgnoreCase(TRUE_STR);
} else if (defaultVal != null) {
return defaultVal;
} else {
@@ -131,6 +134,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
return getBool(key, null);
}
+ public boolean getBool(ConfigKey key) {
+ String keyName = key.keyname();
+ if (key instanceof SessionVars) {
+ return getBool(keyName, ((SessionVars)key).getConfVars().defaultBoolVal);
+ } else if (key instanceof TajoConf.ConfVars) {
+ return getBool(keyName, ((TajoConf.ConfVars)key).defaultBoolVal);
+ }
+ return getBool(keyName);
+ }
+
public void setInt(String key, int val) {
set(key, String.valueOf(val));
}
@@ -150,6 +163,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
return getInt(key, null);
}
+ public int getInt(ConfigKey key) {
+ String keyName = key.keyname();
+ if (key instanceof SessionVars) {
+ return getInt(keyName, ((SessionVars) key).getConfVars().defaultIntVal);
+ } else if (key instanceof TajoConf.ConfVars) {
+ return getInt(keyName, ((TajoConf.ConfVars) key).defaultIntVal);
+ }
+ return getInt(keyName);
+ }
+
public void setLong(String key, long val) {
set(key, String.valueOf(val));
}
@@ -169,6 +192,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
return getLong(key, null);
}
+ public long getLong(ConfigKey key) {
+ String keyName = key.keyname();
+ if (key instanceof SessionVars) {
+ return getLong(keyName, ((SessionVars) key).getConfVars().defaultLongVal);
+ } else if (key instanceof TajoConf.ConfVars) {
+ return getLong(keyName, ((TajoConf.ConfVars) key).defaultLongVal);
+ }
+ return getLong(keyName);
+ }
+
public void setFloat(String key, float val) {
set(key, String.valueOf(val));
}
@@ -185,7 +218,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
throw new IllegalArgumentException("No such a config key: " + key);
}
} else if (defaultVal != null) {
- return defaultVal.floatValue();
+ return defaultVal;
} else {
throw new IllegalArgumentException("No such a config key: " + key);
}
@@ -194,7 +227,17 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
public float getFloat(String key) {
return getFloat(key, null);
}
-
+
+ public float getFloat(ConfigKey key) {
+ String keyName = key.keyname();
+ if (key instanceof SessionVars) {
+ return getFloat(keyName, ((SessionVars) key).getConfVars().defaultFloatVal);
+ } else if (key instanceof TajoConf.ConfVars) {
+ return getFloat(keyName, ((TajoConf.ConfVars) key).defaultFloatVal);
+ }
+ return getFloat(keyName);
+ }
+
public String remove(String key) {
return keyVals.remove(key);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index d2286cf..8cd9d80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -246,7 +246,7 @@ public class QueryInProgress {
// terminal state will let client to retrieve a query result
// So, we must set the query result before changing query state
- if (isFinishState(this.queryInfo.getQueryState())) {
+ if (isFinishState()) {
if (queryInfo.hasResultdesc()) {
this.queryInfo.setResultDesc(queryInfo.getResultDesc());
}
@@ -259,7 +259,13 @@ public class QueryInProgress {
}
}
- private boolean isFinishState(TajoProtos.QueryState state) {
+ public boolean isKillWait() {
+ TajoProtos.QueryState state = queryInfo.getQueryState();
+ return state == TajoProtos.QueryState.QUERY_KILL_WAIT;
+ }
+
+ public boolean isFinishState() {
+ TajoProtos.QueryState state = queryInfo.getQueryState();
return state == TajoProtos.QueryState.QUERY_FAILED ||
state == TajoProtos.QueryState.QUERY_ERROR ||
state == TajoProtos.QueryState.QUERY_KILLED ||
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 418c30b..9a71d2c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -623,6 +623,12 @@ public class TajoMasterClientService extends AbstractService {
try {
context.getSessionManager().touch(request.getSessionId().getId());
QueryId queryId = new QueryId(request.getQueryId());
+
+ QueryInProgress progress = context.getQueryJobManager().getQueryInProgress(queryId);
+ if (progress == null || progress.isFinishState() || progress.isKillWait()) {
+ return BOOL_TRUE;
+ }
+
QueryManager queryManager = context.getQueryJobManager();
queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
new QueryInfo(queryId)));
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 8e9e343..21ab409 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -992,8 +992,8 @@ public class Repartitioner {
public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext,
Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
String tableName) {
- long splitVolume = StorageUnit.MB *
- stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
+ long splitVolume = (long)StorageUnit.MB *
+ stage.getMasterPlan().getContext().getInt(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
long pageSize = ((long)StorageUnit.MB) *
stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
if (pageSize >= splitVolume) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index 36bbd94..b9371ef 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -19,13 +19,12 @@
package org.apache.tajo.jdbc;
import com.google.common.collect.Maps;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
+import org.apache.tajo.*;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.client.QueryClient;
+import org.apache.tajo.client.QueryStatus;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -33,10 +32,7 @@ import org.junit.experimental.categories.Category;
import java.net.InetSocketAddress;
import java.sql.*;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.junit.Assert.*;
@@ -673,4 +669,31 @@ public class TestTajoJdbc extends QueryTestCaseBase {
}
}
}
+
+ @Test
+ public final void testCancel() throws Exception {
+ String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(),
+ DEFAULT_DATABASE_NAME);
+ Properties props = new Properties();
+ props.setProperty(SessionVars.BLOCK_ON_RESULT.keyname(), "false");
+
+ Connection conn = new JdbcConnection(connUri, props);
+ PreparedStatement statement = conn.prepareStatement("select sleep(1) from lineitem");
+ try {
+ assertTrue("should have result set", statement.execute());
+ TajoResultSetBase result = (TajoResultSetBase) statement.getResultSet();
+ Thread.sleep(1000); // todo query master is not killed properly if it's compiling the query (use 100, if you want see)
+ statement.cancel();
+
+ QueryStatus status = client.getQueryStatus(result.getQueryId());
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, status.getState());
+ } finally {
+ if (statement != null) {
+ statement.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index bcd8970..19b2ee1 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -36,4 +36,5 @@ Available Session Variables:
\set CODEGEN [true or false] - Runtime code generation enabled (experiment)
\set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
\set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
+\set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index 85b6af3..e5f2170 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -21,11 +21,13 @@ package org.apache.tajo.jdbc;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.client.CatalogAdminClient;
import org.apache.tajo.client.QueryClient;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.jdbc.util.QueryStringDecoder;
import org.apache.tajo.rpc.RpcUtils;
import org.apache.tajo.util.KeyValueSet;
@@ -53,6 +55,8 @@ public class JdbcConnection implements Connection {
/** it will be used soon. */
private final Map<String, List<String>> params;
+ private final KeyValueSet clientProperties;
+
public JdbcConnection(String rawURI, Properties properties) throws SQLException {
this.rawURI = rawURI;
this.properties = properties;
@@ -99,7 +103,7 @@ public class JdbcConnection implements Connection {
throw new SQLException("Invalid JDBC URI: " + rawURI, "TAJO-001");
}
- KeyValueSet clientProperties = new KeyValueSet();
+ clientProperties = new KeyValueSet();
if(properties != null) {
for(Map.Entry<Object, Object> entry: properties.entrySet()) {
clientProperties.set(entry.getKey().toString(), entry.getValue().toString());
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
index 8f5bed6..1813377 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
@@ -31,17 +31,12 @@ public class TajoMetaDataResultSet extends TajoResultSetBase {
private List<MetaDataTuple> values;
public TajoMetaDataResultSet(Schema schema, List<MetaDataTuple> values) {
- super(null);
- init();
- this.schema = schema;
+ super(null, schema, null);
setDataTuples(values);
}
public TajoMetaDataResultSet(List<String> columns, List<Type> types, List<MetaDataTuple> values) {
- super(null);
- init();
- schema = new Schema();
-
+ super(null, new Schema(), null);
int index = 0;
if(columns != null) {
for(String columnName: columns) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
index 229587a..0574bf9 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
@@ -30,35 +30,16 @@ import java.util.HashMap;
* TajoPreparedStatement.
*
*/
-public class TajoPreparedStatement implements PreparedStatement {
- private JdbcConnection conn;
+public class TajoPreparedStatement extends TajoStatement implements PreparedStatement {
+
private final String sql;
- private TajoClient tajoClient;
+
/**
* save the SQL parameters {paramLoc:paramValue}
*/
private final HashMap<Integer, String> parameters=new HashMap<Integer, String>();
/**
- * We need to keep a reference to the result set to support the following:
- * <code>
- * statement.execute(String sql);
- * statement.getResultSet();
- * </code>.
- */
- private ResultSet resultSet = null;
-
- /**
- * Add SQLWarnings to the warningChain if needed.
- */
- //private SQLWarning warningChain = null;
-
- /**
- * Keep state so we can fail certain calls made after close().
- */
- private boolean isClosed = false;
-
- /**
* keep the current ResultRet update count
*/
private int updateCount = 0;
@@ -69,8 +50,7 @@ public class TajoPreparedStatement implements PreparedStatement {
public TajoPreparedStatement(JdbcConnection conn,
TajoClient tajoClient,
String sql) {
- this.conn = conn;
- this.tajoClient = tajoClient;
+ super(conn, tajoClient);
this.sql = sql;
}
@@ -81,6 +61,7 @@ public class TajoPreparedStatement implements PreparedStatement {
@Override
public void clearParameters() throws SQLException {
+ checkConnection("Can't clear parameters");
this.parameters.clear();
}
@@ -101,22 +82,14 @@ public class TajoPreparedStatement implements PreparedStatement {
return updateCount;
}
- protected ResultSet executeImmediate(String sql) throws SQLException {
- if (isClosed) {
- throw new SQLException("Can't execute after statement has been closed");
- }
+ protected TajoResultSetBase executeImmediate(String sql) throws SQLException {
+ checkConnection("Can't execute");
try {
if (sql.contains("?")) {
sql = updateSql(sql, parameters);
}
- if (TajoStatement.isSetVariableQuery(sql)) {
- return TajoStatement.setSessionVariable(tajoClient, sql);
- } else if (TajoStatement.isUnSetVariableQuery(sql)) {
- return TajoStatement.unSetSessionVariable(tajoClient, sql);
- } else {
- return tajoClient.executeQueryAndGetResult(sql);
- }
+ return (TajoResultSetBase) executeSQL(sql);
} catch (Exception e) {
throw new SQLException(e.getMessage(), e);
}
@@ -131,7 +104,7 @@ public class TajoPreparedStatement implements PreparedStatement {
*/
private String updateSql(final String sql, HashMap<Integer, String> parameters) {
- StringBuffer newSql = new StringBuffer(sql);
+ StringBuilder newSql = new StringBuilder(sql);
int paramLoc = 1;
while (getCharIndexFromSqlByParamLocation(sql, '?', paramLoc) > 0) {
@@ -179,6 +152,7 @@ public class TajoPreparedStatement implements PreparedStatement {
@Override
public ResultSetMetaData getMetaData() throws SQLException {
+ checkConnection("Can't get metadata");
if(resultSet != null) {
return resultSet.getMetaData();
} else {
@@ -249,6 +223,7 @@ public class TajoPreparedStatement implements PreparedStatement {
@Override
public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+ checkConnection("Can't set parameters");
this.parameters.put(parameterIndex, "" + x);
}
@@ -306,21 +281,25 @@ public class TajoPreparedStatement implements PreparedStatement {
@Override
public void setDouble(int parameterIndex, double x) throws SQLException {
+ checkConnection("Can't set parameters");
this.parameters.put(parameterIndex,"" + x);
}
@Override
public void setFloat(int parameterIndex, float x) throws SQLException {
+ checkConnection("Can't set parameters");
this.parameters.put(parameterIndex,"" + x);
}
@Override
public void setInt(int parameterIndex, int x) throws SQLException {
+ checkConnection("Can't set parameters");
this.parameters.put(parameterIndex,"" + x);
}
@Override
public void setLong(int parameterIndex, long x) throws SQLException {
+ checkConnection("Can't set parameters");
this.parameters.put(parameterIndex,"" + x);
}
@@ -399,11 +378,13 @@ public class TajoPreparedStatement implements PreparedStatement {
@Override
public void setShort(int parameterIndex, short x) throws SQLException {
+ checkConnection("Can't set parameters");
this.parameters.put(parameterIndex,"" + x);
}
@Override
public void setString(int parameterIndex, String x) throws SQLException {
+ checkConnection("Can't set parameters");
x=x.replace("'", "\\'");
this.parameters.put(parameterIndex,"'" + x +"'");
}
@@ -439,227 +420,4 @@ public class TajoPreparedStatement implements PreparedStatement {
throws SQLException {
throw new SQLFeatureNotSupportedException("setUnicodeStream not supported");
}
-
- @Override
- public void addBatch(String sql) throws SQLException {
- throw new SQLFeatureNotSupportedException("addBatch not supported");
- }
-
- @Override
- public void cancel() throws SQLException {
- throw new SQLFeatureNotSupportedException("cancel not supported");
- }
-
- @Override
- public void clearBatch() throws SQLException {
- throw new SQLFeatureNotSupportedException("clearBatch not supported");
- }
-
- @Override
- public void clearWarnings() throws SQLException {
- }
-
- public void closeOnCompletion() throws SQLException {
- // JDK 1.7
- throw new SQLFeatureNotSupportedException("closeOnCompletion");
- }
-
- @Override
- public void close() throws SQLException {
- if (resultSet!=null) {
- resultSet.close();
- resultSet = null;
- }
- isClosed = true;
- }
-
- @Override
- public boolean execute(String sql) throws SQLException {
- throw new SQLFeatureNotSupportedException("execute(sql) not supported");
- }
-
- @Override
- public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLFeatureNotSupportedException("execute(sql) not supported");
- }
-
- @Override
- public boolean execute(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLFeatureNotSupportedException("execute(sql) not supported");
- }
-
- @Override
- public boolean execute(String sql, String[] columnNames) throws SQLException {
- throw new SQLFeatureNotSupportedException("execute(sql) not supported");
- }
-
- @Override
- public int[] executeBatch() throws SQLException {
- throw new SQLFeatureNotSupportedException("executeBatch not supported");
- }
-
- @Override
- public ResultSet executeQuery(String sql) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeQuery(sql) not supported");
- }
-
- @Override
- public int executeUpdate(String sql) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeUpdate not supported");
- }
-
- @Override
- public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeUpdate not supported");
- }
-
- @Override
- public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeUpdate not supported");
- }
-
- @Override
- public int executeUpdate(String sql, String[] columnNames) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeUpdate not supported");
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- return conn;
- }
-
- @Override
- public int getFetchDirection() throws SQLException {
- throw new SQLFeatureNotSupportedException("getFetchDirection not supported");
- }
-
- @Override
- public int getFetchSize() throws SQLException {
- throw new SQLFeatureNotSupportedException("getFetchSize not supported");
- }
-
- @Override
- public ResultSet getGeneratedKeys() throws SQLException {
- throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported");
- }
-
- @Override
- public int getMaxFieldSize() throws SQLException {
- throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported");
- }
-
- @Override
- public int getMaxRows() throws SQLException {
- throw new SQLFeatureNotSupportedException("getMaxRows not supported");
- }
-
- @Override
- public boolean getMoreResults() throws SQLException {
- throw new SQLFeatureNotSupportedException("getMoreResults not supported");
- }
-
- @Override
- public boolean getMoreResults(int current) throws SQLException {
- throw new SQLFeatureNotSupportedException("getMoreResults not supported");
- }
-
- @Override
- public int getQueryTimeout() throws SQLException {
- throw new SQLFeatureNotSupportedException("getQueryTimeout not supported");
- }
-
- @Override
- public ResultSet getResultSet() throws SQLException {
- return this.resultSet;
- }
-
- @Override
- public int getResultSetConcurrency() throws SQLException {
- throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported");
- }
-
- @Override
- public int getResultSetHoldability() throws SQLException {
- throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported");
- }
-
- @Override
- public int getResultSetType() throws SQLException {
- throw new SQLFeatureNotSupportedException("getResultSetType not supported");
- }
-
- @Override
- public int getUpdateCount() throws SQLException {
- return updateCount;
- }
-
- @Override
- public SQLWarning getWarnings() throws SQLException {
- return null;
- }
-
- @Override
- public boolean isClosed() throws SQLException {
- return isClosed;
- }
-
- public boolean isCloseOnCompletion() throws SQLException {
- //JDK 1.7
- throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported");
- }
-
- @Override
- public boolean isPoolable() throws SQLException {
- throw new SQLFeatureNotSupportedException("isPoolable not supported");
- }
-
- @Override
- public void setCursorName(String name) throws SQLException {
- throw new SQLFeatureNotSupportedException("setCursorName not supported");
- }
-
- @Override
- public void setEscapeProcessing(boolean enable) throws SQLException {
- throw new SQLFeatureNotSupportedException("setEscapeProcessing not supported");
- }
-
- @Override
- public void setFetchDirection(int direction) throws SQLException {
- throw new SQLFeatureNotSupportedException("setFetchDirection not supported");
- }
-
- @Override
- public void setFetchSize(int rows) throws SQLException {
- throw new SQLFeatureNotSupportedException("setFetchSize not supported");
- }
-
- @Override
- public void setMaxFieldSize(int max) throws SQLException {
- throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported");
- }
-
- @Override
- public void setMaxRows(int max) throws SQLException {
- throw new SQLFeatureNotSupportedException("setMaxRows not supported");
- }
-
- @Override
- public void setPoolable(boolean poolable) throws SQLException {
- throw new SQLFeatureNotSupportedException("setPoolable not supported");
- }
-
- @Override
- public void setQueryTimeout(int seconds) throws SQLException {
- throw new SQLFeatureNotSupportedException("setQueryTimeout not supported");
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- throw new SQLFeatureNotSupportedException("unwrap not supported");
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index 820e350..0f80ddf 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -19,17 +19,21 @@ package org.apache.tajo.jdbc;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.ipc.ClientProtos;
+import java.io.IOException;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
public class TajoStatement implements Statement {
- private JdbcConnection conn;
- private TajoClient tajoClient;
- private int fetchSize = 200;
+ protected JdbcConnection conn;
+ protected TajoClient tajoClient;
+ protected int fetchSize = SessionVars.FETCH_ROWNUM.getConfVars().defaultIntVal;
/**
* We need to keep a reference to the result set to support the following:
@@ -38,35 +42,65 @@ public class TajoStatement implements Statement {
* statement.getResultSet();
* </code>.
*/
- private ResultSet resultSet = null;
+ protected TajoResultSetBase resultSet = null;
+
+ /**
+ * Add SQLWarnings to the warningChain if needed.
+ */
+ protected SQLWarning warningChain = null;
/**
* Keep state so we can fail certain calls made after close().
*/
- private boolean isClosed = false;
+ private boolean isClosed;
+
+ private boolean blockWait;
public TajoStatement(JdbcConnection conn, TajoClient tajoClient) {
this.conn = conn;
this.tajoClient = tajoClient;
+ this.blockWait = tajoClient.getProperties().getBool(SessionVars.BLOCK_ON_RESULT);
}
+ /*
+ * NOTICE
+ *
+ * For unimplemented methods, this class throws an exception or prints an error message.
+ * If the unimplemented method can cause unexpected result to user application when it is called,
+ * it should throw an exception.
+ * Otherwise, it is enough that prints an error message.
+ */
+
@Override
public void addBatch(String sql) throws SQLException {
- throw new SQLFeatureNotSupportedException("addBatch not supported");
+ throw new SQLFeatureNotSupportedException("addBatch() is not supported yet.");
}
@Override
public void cancel() throws SQLException {
- throw new SQLFeatureNotSupportedException("cancel not supported");
+ checkConnection("Can't cancel query");
+ if (resultSet == null || resultSet.getQueryId().isNull()) {
+ return;
+ }
+ try {
+ tajoClient.killQuery(resultSet.getQueryId());
+ } catch (Exception e) {
+ throw new SQLException(e);
+ } finally {
+ resultSet = null;
+ }
}
@Override
public void clearBatch() throws SQLException {
- throw new SQLFeatureNotSupportedException("clearBatch not supported");
+ throw new SQLFeatureNotSupportedException("clearBatch() is not supported yet.");
}
@Override
- public void clearWarnings() throws SQLException {}
+ public void clearWarnings() throws SQLException {
+ checkConnection("Can't clear warnings");
+ warningChain = null;
+ }
@Override
public void close() throws SQLException {
@@ -79,55 +113,87 @@ public class TajoStatement implements Statement {
public void closeOnCompletion() throws SQLException {
// JDK 1.7
- throw new SQLFeatureNotSupportedException("closeOnCompletion not supported");
+ throw new SQLFeatureNotSupportedException("closeOnCompletion() is not supported yet.");
}
@Override
public boolean execute(String sql) throws SQLException {
- resultSet = executeQuery(sql);
+ resultSet = (TajoResultSetBase) executeQuery(sql);
return resultSet != null;
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLFeatureNotSupportedException("execute not supported");
+ throw new SQLFeatureNotSupportedException("execute() is not supported yet.");
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLFeatureNotSupportedException("execute not supported");
+ throw new SQLFeatureNotSupportedException("execute() is not supported yet.");
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
- throw new SQLFeatureNotSupportedException("execute not supported");
+ throw new SQLFeatureNotSupportedException("execute() is not supported yet.");
}
@Override
public int[] executeBatch() throws SQLException {
- throw new SQLFeatureNotSupportedException("executeBatch not supported");
+ throw new SQLFeatureNotSupportedException("executeBatch() is not supported yet.");
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- if (isClosed) {
- throw new SQLException("Can't execute after statement has been closed");
- }
+ checkConnection("Can't execute");
try {
- if (isSetVariableQuery(sql)) {
- return setSessionVariable(tajoClient, sql);
- } else if (isUnSetVariableQuery(sql)) {
- return unSetSessionVariable(tajoClient, sql);
- } else {
- return tajoClient.executeQueryAndGetResult(sql);
- }
+ return executeSQL(sql);
} catch (Exception e) {
throw new SQLException(e.getMessage(), e);
}
}
+ protected ResultSet executeSQL(String sql) throws SQLException, ServiceException, IOException {
+ if (isSetVariableQuery(sql)) {
+ return setSessionVariable(tajoClient, sql);
+ }
+ if (isUnSetVariableQuery(sql)) {
+ return unSetSessionVariable(tajoClient, sql);
+ }
+
+ ClientProtos.SubmitQueryResponse response = tajoClient.executeQuery(sql);
+ if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+ if (response.hasErrorMessage()) {
+ throw new ServiceException(response.getErrorMessage());
+ }
+ if (response.hasErrorTrace()) {
+ throw new ServiceException(response.getErrorTrace());
+ }
+ throw new ServiceException("Failed to submit query by unknown reason");
+ }
+
+ QueryId queryId = new QueryId(response.getQueryId());
+ if (response.getIsForwarded() && !queryId.isNull()) {
+ WaitingResultSet result = new WaitingResultSet(tajoClient, queryId, fetchSize);
+ if (blockWait) {
+ result.getSchema();
+ }
+ return result;
+ }
+
+ if (response.hasResultSet() || response.hasTableDesc()) {
+ return TajoClientUtil.createResultSet(tajoClient, response, fetchSize);
+ }
+ return TajoClientUtil.createNullResultSet(queryId);
+ }
+
+ protected void checkConnection(String errorMsg) throws SQLException {
+ if (isClosed) {
+ throw new SQLException(errorMsg + " after statement has been closed");
+ }
+ }
+
public static boolean isSetVariableQuery(String sql) {
if (sql == null || sql.trim().isEmpty()) {
return false;
@@ -144,7 +210,7 @@ public class TajoStatement implements Statement {
return sql.trim().toLowerCase().startsWith("unset");
}
- public static ResultSet setSessionVariable(TajoClient client, String sql) throws SQLException {
+ private ResultSet setSessionVariable(TajoClient client, String sql) throws SQLException {
int index = sql.toLowerCase().indexOf("set");
if (index < 0) {
throw new SQLException("SET statement should be started 'SET' keyword: " + sql);
@@ -165,7 +231,7 @@ public class TajoStatement implements Statement {
return TajoClientUtil.createNullResultSet();
}
- public static ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException {
+ private ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException {
int index = sql.toLowerCase().indexOf("unset");
if (index < 0) {
throw new SQLException("UNSET statement should be started 'UNSET' keyword: " + sql);
@@ -186,57 +252,57 @@ public class TajoStatement implements Statement {
@Override
public int executeUpdate(String sql) throws SQLException {
+ checkConnection("Can't execute update");
try {
tajoClient.executeQuery(sql);
return 1;
} catch (Exception ex) {
- throw new SQLFeatureNotSupportedException(ex.toString());
+ throw new SQLException(ex.toString());
}
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+ throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet.");
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+ throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet.");
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
- throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+ throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet.");
}
@Override
public Connection getConnection() throws SQLException {
- if (isClosed)
- throw new SQLException("Can't get connection after statement has been closed");
+ checkConnection("Can't get connection");
return conn;
}
@Override
public int getFetchDirection() throws SQLException {
- throw new SQLFeatureNotSupportedException("getFetchDirection not supported");
+ checkConnection("Can't get fetch direction");
+ return ResultSet.FETCH_FORWARD;
}
@Override
public int getFetchSize() throws SQLException {
- if (isClosed)
- throw new SQLException("Can't get fetch size after statement has been closed");
+ checkConnection("Can't get fetch size");
return fetchSize;
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
- throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported");
+ throw new SQLFeatureNotSupportedException("getGeneratedKeys() is not supported yet.");
}
@Override
public int getMaxFieldSize() throws SQLException {
- throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported");
+ throw new SQLFeatureNotSupportedException("getMaxFieldSize() is not supported yet.");
}
@Override
@@ -246,53 +312,54 @@ public class TajoStatement implements Statement {
@Override
public boolean getMoreResults() throws SQLException {
- throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+ throw new SQLFeatureNotSupportedException("getMoreResults() is not supported yet.");
}
@Override
public boolean getMoreResults(int current) throws SQLException {
- throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+ throw new SQLFeatureNotSupportedException("getMoreResults() is not supported yet.");
}
@Override
public int getQueryTimeout() throws SQLException {
- throw new SQLFeatureNotSupportedException("getQueryTimeout not supported");
+ System.err.println("getResultSetConcurrency() is not supported yet.");
+ return -1;
}
@Override
public ResultSet getResultSet() throws SQLException {
- if (isClosed)
- throw new SQLException("Can't get result set after statement has been closed");
+ checkConnection("Can't get result set");
return resultSet;
}
@Override
public int getResultSetConcurrency() throws SQLException {
- throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported");
+ System.err.println("getResultSetConcurrency() is not supported yet.");
+ return -1;
}
@Override
public int getResultSetHoldability() throws SQLException {
- throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported");
+ System.err.println("getResultSetHoldability() is not supported yet.");
+ return -1;
}
@Override
public int getResultSetType() throws SQLException {
- throw new SQLFeatureNotSupportedException("getResultSetType not supported");
+ System.err.println("getResultSetType() is not supported yet.");
+ return -1;
}
@Override
public int getUpdateCount() throws SQLException {
- if (isClosed)
- throw new SQLException("Can't get update count after statement has been closed");
- return 0;
+ System.err.println("getResultSetType() is not supported yet.");
+ return -1;
}
@Override
public SQLWarning getWarnings() throws SQLException {
- if (isClosed)
- throw new SQLException("Can't get warnings after statement has been closed");
- return null;
+ checkConnection("Can't get warnings");
+ return warningChain;
}
@Override
@@ -302,40 +369,41 @@ public class TajoStatement implements Statement {
public boolean isCloseOnCompletion() throws SQLException {
// JDK 1.7
- throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported");
+ throw new SQLFeatureNotSupportedException("isCloseOnCompletion() is not supported yet.");
}
@Override
public boolean isPoolable() throws SQLException {
- throw new SQLFeatureNotSupportedException("isPoolable not supported");
+ throw new SQLFeatureNotSupportedException("isPoolable() is not supported yet.");
}
@Override
public void setCursorName(String name) throws SQLException {
- throw new SQLFeatureNotSupportedException("setCursorName not supported");
+ System.err.println("setCursorName() is not supported yet.");
}
/**
* Not necessary.
*/
@Override
- public void setEscapeProcessing(boolean enable) throws SQLException {}
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ System.err.println("setEscapeProcessing() is not supported yet.");
+ }
@Override
public void setFetchDirection(int direction) throws SQLException {
- throw new SQLFeatureNotSupportedException("setFetchDirection not supported");
+ System.err.println("setFetchDirection() is not supported yet.");
}
@Override
public void setFetchSize(int rows) throws SQLException {
- if (isClosed)
- throw new SQLException("Can't set fetch size after statement has been closed");
+ checkConnection("Can't set fetch size");
fetchSize = rows;
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
- throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported");
+ System.err.println("setMaxFieldSize() is not supported yet.");
}
@Override
@@ -348,22 +416,23 @@ public class TajoStatement implements Statement {
@Override
public void setPoolable(boolean poolable) throws SQLException {
- throw new SQLFeatureNotSupportedException("setPoolable not supported");
+ System.err.println("setPoolable() is not supported yet.");
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
- throw new SQLFeatureNotSupportedException("setQueryTimeout not supported");
+ System.err.println("setQueryTimeout() is not supported yet.");
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
- throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
+ System.err.println("isWrapperFor() is not supported yet.");
+ return false;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
- throw new SQLFeatureNotSupportedException("unwrap not supported");
+ System.err.println("unwrap() is not supported yet.");
+ return null;
}
-
}