You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/07 18:36:00 UTC
[1/6] tajo git commit: TAJO-1583: Remove ServerCallable in RPC
client. (jinho)
Repository: tajo
Updated Branches:
refs/heads/index_support 86c97b2a1 -> 42bcf2de0
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index be757af..84decd5 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -32,19 +32,18 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
-import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.ProtoUtil;
import java.io.Closeable;
import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
@@ -53,7 +52,7 @@ import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProto
public class SessionConnection implements Closeable {
- private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
+ private final static Log LOG = LogFactory.getLog(SessionConnection.class);
final RpcClientManager manager;
@@ -70,6 +69,8 @@ public class SessionConnection implements Closeable {
private ServiceTracker serviceTracker;
+ private NettyClientBase client;
+
private KeyValueSet properties;
/**
@@ -88,27 +89,34 @@ public class SessionConnection implements Closeable {
this.manager = RpcClientManager.getInstance();
this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
- this.manager.setTimeoutSeconds(
- properties.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 0)); // disable rpc timeout
-
this.userInfo = UserRoleInfo.getCurrentUser();
this.baseDatabase = baseDatabase != null ? baseDatabase : null;
this.serviceTracker = tracker;
+ try {
+ this.client = getTajoMasterConnection();
+ } catch (ServiceException e) {
+ throw new IOException(e);
+ }
}
public Map<String, String> getClientSideSessionVars() {
return Collections.unmodifiableMap(sessionVarsCache);
}
- public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
- ConnectException, ClassNotFoundException {
- return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
- }
-
- public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode)
- throws NoSuchMethodException, ConnectException, ClassNotFoundException {
- return manager.getClient(addr, protocolClass, asyncMode);
+ public synchronized NettyClientBase getTajoMasterConnection() throws ServiceException {
+ if (client != null && client.isConnected()) return client;
+ else {
+ try {
+ RpcClientManager.cleanup(client);
+ // Client do not closed on idle state for support high available
+ this.client = manager.newClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false,
+ manager.getRetries(), 0, TimeUnit.SECONDS, false);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ return client;
+ }
}
protected KeyValueSet getProperties() {
@@ -129,10 +137,9 @@ public class SessionConnection implements Closeable {
}
public boolean isConnected() {
- if(!closed.get()){
+ if (!closed.get()) {
try {
- return manager.getClient(serviceTracker.getClientServiceAddress(),
- TajoMasterClientProtocol.class, false).isConnected();
+ return getTajoMasterConnection().isConnected();
} catch (Throwable e) {
return false;
}
@@ -145,64 +152,51 @@ public class SessionConnection implements Closeable {
}
public String getCurrentDatabase() throws ServiceException {
- return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public String call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
- }
- }.withRetries();
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
}
public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException {
- return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- KeyValueSet keyValueSet = new KeyValueSet();
- keyValueSet.putAll(variables);
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .setSessionVars(keyValueSet.getProto()).build();
-
- SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
-
- if (response.getResultCode() == ResultCode.OK) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw new ServiceException(response.getMessage());
- }
- }
- }.withRetries();
- }
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
- public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
- return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.putAll(variables);
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .setSessionVars(keyValueSet.getProto()).build();
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
+ SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .addAllUnsetVariables(variables).build();
+ if (response.getResultCode() == ResultCode.OK) {
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
+ } else {
+ throw new ServiceException(response.getMessage());
+ }
+ }
- SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+ public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
- if (response.getResultCode() == ResultCode.OK) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw new ServiceException(response.getMessage());
- }
- }
- }.withRetries();
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .addAllUnsetVariables(variables).build();
+
+ SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+
+ if (response.getResultCode() == ResultCode.OK) {
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
+ } else {
+ throw new ServiceException(response.getMessage());
+ }
}
void updateSessionVarsCache(Map<String, String> variables) {
@@ -213,35 +207,26 @@ public class SessionConnection implements Closeable {
}
public String getSessionVariable(final String varname) throws ServiceException {
- return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public String call(NettyClientBase client) throws ServiceException {
-
- synchronized (sessionVarsCache) {
- // If a desired variable is client side one and exists in the cache, immediately return the variable.
- if (sessionVarsCache.containsKey(varname)) {
- return sessionVarsCache.get(varname);
- }
- }
+ synchronized (sessionVarsCache) {
+ // If a desired variable is client side one and exists in the cache, immediately return the variable.
+ if (sessionVarsCache.containsKey(varname)) {
+ return sessionVarsCache.get(varname);
+ }
+ }
- checkSessionAndGet(client);
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
- }
- }.withRetries();
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
}
public Boolean existSessionVariable(final String varname) throws ServiceException {
- return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
- }
- }.withRetries();
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
}
public Map<String, String> getCachedAllSessionVariables() {
@@ -251,29 +236,19 @@ public class SessionConnection implements Closeable {
}
public Map<String, String> getAllSessionVariables() throws ServiceException {
- return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class,
- false) {
-
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId));
- }
- }.withRetries();
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId));
}
public Boolean selectDatabase(final String databaseName) throws ServiceException {
- Boolean selected = new ServerCallable<Boolean>(manager, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
- }
- }.withRetries();
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ boolean selected = tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
if (selected) {
this.baseDatabase = databaseName;
@@ -283,14 +258,14 @@ public class SessionConnection implements Closeable {
@Override
public void close() {
- if(closed.getAndSet(true)){
+ if (closed.getAndSet(true)) {
return;
}
// remove session
NettyClientBase client = null;
try {
- client = manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+ client = getTajoMasterConnection();
TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
tajoMaster.removeSession(null, sessionId);
} catch (Throwable e) {
@@ -333,55 +308,51 @@ public class SessionConnection implements Closeable {
}
public boolean reconnect() throws Exception {
- return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
- builder.setUsername(userInfo.getUserName()).build();
- if (baseDatabase != null) {
- builder.setBaseDatabaseName(baseDatabase);
- }
+ CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+ builder.setUsername(userInfo.getUserName()).build();
+ if (baseDatabase != null) {
+ builder.setBaseDatabaseName(baseDatabase);
+ }
+ NettyClientBase client = getTajoMasterConnection();
- // create new session
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
- if (response.getResultCode() != ResultCode.OK) {
- return false;
- }
+ // create new session
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
+ if (response.getResultCode() != ResultCode.OK) {
+ return false;
+ }
- // Invalidate some session variables in client cache
- sessionId = response.getSessionId();
- Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
- synchronized (sessionVarsCache) {
- for (SessionVars var : UPDATE_ON_RECONNECT) {
- String value = sessionVars.get(var.keyname());
- if (value != null) {
- sessionVarsCache.put(var.keyname(), value);
- }
- }
+ // Invalidate some session variables in client cache
+ sessionId = response.getSessionId();
+ Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+ synchronized (sessionVarsCache) {
+ for (SessionVars var : UPDATE_ON_RECONNECT) {
+ String value = sessionVars.get(var.keyname());
+ if (value != null) {
+ sessionVarsCache.put(var.keyname(), value);
}
+ }
+ }
- // Update the session variables in server side
- try {
- KeyValueSet keyValueSet = new KeyValueSet();
- keyValueSet.putAll(sessionVarsCache);
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .setSessionVars(keyValueSet.getProto()).build();
-
- if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
- tajoMasterService.removeSession(null, sessionId);
- return false;
- }
- LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
- return true;
- } catch (ServiceException e) {
- tajoMasterService.removeSession(null, sessionId);
- return false;
- }
+ // Update the session variables in server side
+ try {
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.putAll(sessionVarsCache);
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .setSessionVars(keyValueSet.getProto()).build();
+
+ if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
}
- }.withRetries();
+ LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+ return true;
+ } catch (ServiceException e) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
index df709c5..0bb11e0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
@@ -88,7 +88,6 @@ public class TestDefaultCliOutputFormatter {
String multiLineMessage =
"ERROR: java.sql.SQLException: ERROR: no such a table: table1\n" +
"com.google.protobuf.ServiceException: java.sql.SQLException: ERROR: no such a table: table1\n" +
- "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:107)\n" +
"\tat org.apache.tajo.client.TajoClient.getTableDesc(TajoClient.java:777)\n" +
"\tat org.apache.tajo.cli.tsql.commands.DescTableCommand.invoke(DescTableCommand.java:43)\n" +
"\tat org.apache.tajo.cli.tsql.TajoCli.executeMetaCommand(TajoCli.java:300)\n" +
@@ -96,9 +95,6 @@ public class TestDefaultCliOutputFormatter {
"\tat org.apache.tajo.cli.tsql.TajoCli.runShell(TajoCli.java:271)\n" +
"\tat org.apache.tajo.cli.tsql.TajoCli.main(TajoCli.java:420)\n" +
"Caused by: java.sql.SQLException: ERROR: no such a table: table1\n" +
- "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:791)\n" +
- "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:778)\n" +
- "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:97)\n" +
"\t... 6 more";
assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(multiLineMessage));
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index b2e1ce9..b1a27fa 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -34,10 +34,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.TaskRequestImpl;
import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.master.event.QueryEvent;
-import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.master.event.StageEvent;
-import org.apache.tajo.master.event.StageEventType;
+import org.apache.tajo.master.event.*;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -131,8 +128,7 @@ public class TestKillQuery {
assertNotNull(stage);
// fire kill event
- Query q = queryMasterTask.getQuery();
- q.handle(new QueryEvent(queryId, QueryEventType.KILL));
+ queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
try {
cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
@@ -157,24 +153,55 @@ public class TestKillQuery {
@Test
public final void testIgnoreStageStateFromKilled() throws Exception {
- ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
- QueryId queryId = new QueryId(res.getQueryId());
- cluster.waitForQuerySubmitted(queryId);
+ SQLAnalyzer analyzer = new SQLAnalyzer();
+ QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+ Session session = LocalTajoTestingUtility.createDummySession();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+
+ LogicalPlanner planner = new LogicalPlanner(catalog);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ Expr expr = analyzer.parse(queryStr);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+ optimizer.optimize(plan);
+
+ QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+ QueryContext queryContext = new QueryContext(conf);
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+ GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+ globalPlanner.build(masterPlan);
- QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
- Query query = qmt.getQuery();
+ CountDownLatch barrier = new CountDownLatch(1);
+ MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+
+ QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+ QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+ queryId, session, defaultContext, expr.toJson(), dispatch);
- // wait for a stage created
- cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+ queryMasterTask.init(conf);
+ queryMasterTask.getQueryTaskContext().getDispatcher().start();
+ queryMasterTask.startQuery();
try{
- cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
- } finally {
- assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState());
+ barrier.await(5000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+ }
+
+ Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+ assertNotNull(stage);
+
+ // fire kill event
+ queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+ try {
+ cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+ } finally {
+ queryMasterTask.stop();
}
- List<Stage> stages = Lists.newArrayList(query.getStages());
+ List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
Stage lastStage = stages.get(stages.size() - 1);
assertEquals(StageState.KILLED, lastStage.getSynchronizedState());
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 190beae..8f6f9ed 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -252,11 +252,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
@Override
public void close() {
- getHandler().sendExceptions(getClass().getSimpleName() + "terminates all the connections");
-
Channel channel = getChannel();
if (channel != null && channel.isOpen()) {
LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+ /* channelInactive receives event and then client terminates all the requests */
channel.close().syncUninterruptibly();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
deleted file mode 100644
index 3c054ad..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-public class RetriesExhaustedException extends RuntimeException {
- private static final long serialVersionUID = 1876775844L;
-
- public RetriesExhaustedException(final String msg) {
- super(msg);
- }
-
- public RetriesExhaustedException(final String msg, final IOException e) {
- super(msg, e);
- }
-
- /**
- * Datastructure that allows adding more info around Throwable incident.
- */
- public static class ThrowableWithExtraContext {
- private final Throwable t;
- private final long when;
- private final String extras;
-
- public ThrowableWithExtraContext(final Throwable t, final long when,
- final String extras) {
- this.t = t;
- this.when = when;
- this.extras = extras;
- }
-
- @Override
- public String toString() {
- return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
- }
- }
-
- /**
- * Create a new RetriesExhaustedException from the list of prior failures.
- * @param callableVitals Details from the {@link ServerCallable} we were using
- * when we got this exception.
- * @param numTries The number of tries we made
- * @param exceptions List of exceptions that failed before giving up
- */
- public RetriesExhaustedException(final String callableVitals, int numTries,
- List<Throwable> exceptions) {
- super(getMessage(callableVitals, numTries, exceptions));
- }
-
- /**
- * Create a new RetriesExhaustedException from the list of prior failures.
- * @param numTries
- * @param exceptions List of exceptions that failed before giving up
- */
- public RetriesExhaustedException(final int numTries,
- final List<Throwable> exceptions) {
- super(getMessage(numTries, exceptions));
- }
-
- private static String getMessage(String callableVitals, int numTries,
- List<Throwable> exceptions) {
- StringBuilder buffer = new StringBuilder("Failed contacting ");
- buffer.append(callableVitals);
- buffer.append(" after ");
- buffer.append(numTries + 1);
- buffer.append(" attempts.\nExceptions:\n");
- for (Throwable t : exceptions) {
- buffer.append(t.toString());
- buffer.append("\n");
- }
- return buffer.toString();
- }
-
- private static String getMessage(final int numTries,
- final List<Throwable> exceptions) {
- StringBuilder buffer = new StringBuilder("Failed after attempts=");
- buffer.append(numTries + 1);
- buffer.append(", exceptions:\n");
- for (Throwable t : exceptions) {
- buffer.append(t.toString());
- buffer.append("\n");
- }
- return buffer.toString();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
deleted file mode 100644
index 2804a03..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ServiceException;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-
-public abstract class ServerCallable<T> {
- protected InetSocketAddress addr;
- protected long startTime;
- protected long endTime;
- protected Class<?> protocol;
- protected boolean asyncMode;
- protected boolean closeConn;
- protected RpcClientManager manager;
-
- public abstract T call(NettyClientBase client) throws Exception;
-
- public ServerCallable(RpcClientManager manager, InetSocketAddress addr, Class<?> protocol,
- boolean asyncMode) {
- this.manager = manager;
- this.addr = addr;
- this.protocol = protocol;
- this.asyncMode = asyncMode;
- }
-
- public void beforeCall() {
- this.startTime = System.currentTimeMillis();
- }
-
- public long getStartTime(){
- return startTime;
- }
-
- public void afterCall() {
- this.endTime = System.currentTimeMillis();
- }
-
- public long getEndTime(){
- return endTime;
- }
-
- boolean abort = false;
- public void abort() {
- abort = true;
- }
- /**
- * Run this instance with retries, timed waits,
- * and refinds of missing regions.
- *
- * @return an object of type T
- * @throws com.google.protobuf.ServiceException if a remote or network exception occurs
- */
-
- public T withRetries() throws ServiceException {
- //TODO configurable
- final long pause = 500; //ms
- final int numRetries = 3;
-
- for (int tries = 0; tries < numRetries; tries++) {
- NettyClientBase client = null;
- try {
- beforeCall();
- if(addr != null) {
- client = manager.getClient(addr, protocol, asyncMode);
- }
- return call(client);
- } catch (IOException ioe) {
- if(abort) {
- throw new ServiceException(ioe.getMessage(), ioe);
- }
- if (tries == numRetries - 1) {
- throw new ServiceException("Giving up after tries=" + tries, ioe);
- }
- } catch (Throwable t) {
- throw new ServiceException(t);
- } finally {
- afterCall();
- if(closeConn) {
- RpcClientManager.cleanup(client);
- }
- }
- try {
- Thread.sleep(pause * (tries + 1));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ServiceException("Giving up after tries=" + tries, e);
- }
- }
- return null;
- }
-
- /**
- * Run this instance against the server once.
- * @return an object of type T
- * @throws java.io.IOException if a remote or network exception occurs
- * @throws RuntimeException other unspecified error
- */
- public T withoutRetries() throws IOException, RuntimeException {
- NettyClientBase client = null;
- try {
- beforeCall();
- client = manager.getClient(addr, protocol, asyncMode);
- return call(client);
- } catch (Throwable t) {
- Throwable t2 = translateException(t);
- if (t2 instanceof IOException) {
- throw (IOException)t2;
- } else {
- throw new RuntimeException(t2);
- }
- } finally {
- afterCall();
- if(closeConn) {
- RpcClientManager.cleanup(client);
- }
- }
- }
-
- private static Throwable translateException(Throwable t) throws IOException {
- if (t instanceof UndeclaredThrowableException) {
- t = t.getCause();
- }
- if (t instanceof RemoteException && t.getCause() != null) {
- t = t.getCause();
- }
- return t;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 6f7fdd1..c86db80 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -170,45 +170,6 @@ public class TestBlockingRpc {
}
@Test
- @SetupRpcConnection(setupRpcClient = false)
- @Deprecated // serverCallable will be remove
- public void testRpcWithServiceCallable() throws Exception {
- RpcClientManager manager = RpcClientManager.getInstance();
- final SumRequest request = SumRequest.newBuilder()
- .setX1(1)
- .setX2(2)
- .setX3(3.15d)
- .setX4(2.0f).build();
-
- SumResponse response =
- new ServerCallable<SumResponse>(manager,
- server.getListenAddress(), DummyProtocol.class, false) {
- @Override
- public SumResponse call(NettyClientBase client) throws Exception {
- BlockingInterface stub2 = client.getStub();
- SumResponse response1 = stub2.sum(null, request);
- return response1;
- }
- }.withRetries();
-
- assertEquals(8.15d, response.getResult(), 1e-15);
-
- response =
- new ServerCallable<SumResponse>(manager,
- server.getListenAddress(), DummyProtocol.class, false) {
- @Override
- public SumResponse call(NettyClientBase client) throws Exception {
- BlockingInterface stub2 = client.getStub();
- SumResponse response1 = stub2.sum(null, request);
- return response1;
- }
- }.withoutRetries();
-
- assertTrue(8.15d == response.getResult());
- RpcClientManager.close();
- }
-
- @Test
public void testThrowException() throws Exception {
EchoMessage message = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
[3/6] tajo git commit: TAJO-1534: DelimitedTextFile return null
instead of a NullDatum. (jinho)
Posted by ji...@apache.org.
TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)
Closes #522
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b6b9d463
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b6b9d463
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b6b9d463
Branch: refs/heads/index_support
Commit: b6b9d46318d0aaa0346578b5ecf11b9d9ba74b0c
Parents: 4755410
Author: Jinho Kim <jh...@apache.org>
Authored: Wed May 6 12:11:37 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed May 6 12:11:37 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/storage/rcfile/RCFile.java | 14 ++---
.../tajo/storage/text/CSVLineDeserializer.java | 18 +++++-
.../org/apache/tajo/storage/TestStorages.java | 59 ++++++++++++++++++++
4 files changed, 83 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 952f852..a790655 100644
--- a/CHANGES
+++ b/CHANGES
@@ -108,6 +108,8 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)
+
TAJO-1574: Fix NPE on natural join.
(Contributed by Dongjoon Hyun, Committed by jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 44aabd4..62e5ed9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -1255,17 +1255,18 @@ public class RCFile {
for (int i = 0; i < targetColumnIndexes.length; i++) {
int tid = targetColumnIndexes[i];
+ SelectedColumn col = new SelectedColumn();
+ col.colIndex = tid;
if (tid < columnNumber) {
skippedColIDs[tid] = false;
-
- SelectedColumn col = new SelectedColumn();
- col.colIndex = tid;
col.runLength = 0;
col.prvLength = -1;
col.rowReadIndex = 0;
- selectedColumns[i] = col;
colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+ } else {
+ col.isNulled = true;
}
+ selectedColumns[i] = col;
}
currentKey = createKeyBuffer();
@@ -1583,10 +1584,7 @@ public class RCFile {
for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
SelectedColumn col = selectedColumns[selIx];
- if (col == null) {
- col = new SelectedColumn();
- col.isNulled = true;
- selectedColumns[selIx] = col;
+ if (col.isNulled) {
continue;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 6a8c7a9..03a0a26 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBufProcessor;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.FieldSerializerDeserializer;
import org.apache.tajo.storage.Tuple;
@@ -80,8 +81,14 @@ public class CSVLineDeserializer extends TextLineDeserializer {
if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
lineBuf.setIndex(start, start + fieldLength);
- Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
- output.put(currentIndex, datum);
+
+ try {
+ Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
+ output.put(currentIndex, datum);
+ } catch (Exception e) {
+ output.put(currentIndex, NullDatum.get());
+ }
+
currentTarget++;
}
@@ -92,6 +99,13 @@ public class CSVLineDeserializer extends TextLineDeserializer {
start = end + 1;
currentIndex++;
}
+
+ /* If a text row is less than table schema size, tuple should set to NullDatum */
+ if (projection.length > currentTarget) {
+ for (; currentTarget < projection.length; currentTarget++) {
+ output.put(projection[currentTarget], NullDatum.get());
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 790ac4a..456ea00 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -952,4 +952,63 @@ public class TestStorages {
StorageManager.clearCache();
}
}
+
+ @Test
+ public void testLessThanSchemaSize() throws IOException {
+ /* RAW is internal storage. It must be same with schema size */
+ if (storeType == StoreType.RAW || storeType == StoreType.AVRO){
+ return;
+ }
+
+ Schema dataSchema = new Schema();
+ dataSchema.addColumn("col1", Type.FLOAT4);
+ dataSchema.addColumn("col2", Type.FLOAT8);
+ dataSchema.addColumn("col3", Type.INT2);
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+
+ Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
+ FileStorageManager sm = (FileStorageManager) StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, dataSchema, tablePath);
+ appender.init();
+
+
+ Tuple expect = new VTuple(dataSchema.size());
+ expect.put(new Datum[]{
+ DatumFactory.createFloat4(Float.MAX_VALUE),
+ DatumFactory.createFloat8(Double.MAX_VALUE),
+ DatumFactory.createInt2(Short.MAX_VALUE)
+ });
+
+ appender.addTuple(expect);
+ appender.flush();
+ appender.close();
+
+ assertTrue(fs.exists(tablePath));
+ FileStatus status = fs.getFileStatus(tablePath);
+ Schema inSchema = new Schema();
+ inSchema.addColumn("col1", Type.FLOAT4);
+ inSchema.addColumn("col2", Type.FLOAT8);
+ inSchema.addColumn("col3", Type.INT2);
+ inSchema.addColumn("col4", Type.INT4);
+ inSchema.addColumn("col5", Type.INT8);
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, inSchema, fragment);
+
+ Schema target = new Schema();
+
+ target.addColumn("col2", Type.FLOAT8);
+ target.addColumn("col5", Type.INT8);
+ scanner.setTarget(target.toArray());
+ scanner.init();
+
+ Tuple tuple = scanner.next();
+ scanner.close();
+
+ assertEquals(expect.get(1), tuple.get(1));
+ assertEquals(NullDatum.get(), tuple.get(4));
+ }
}
[5/6] tajo git commit: TAJO-1556: "insert into select" with reordered
column list does not work.
Posted by ji...@apache.org.
TAJO-1556: "insert into select" with reordered column list does not work.
Closes #546
Signed-off-by: Jihoon Son <ji...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9b3824b5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9b3824b5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9b3824b5
Branch: refs/heads/index_support
Commit: 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
Parents: 04167bd
Author: Yongjin Choi <su...@gmail.com>
Authored: Wed May 6 18:41:51 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed May 6 18:42:29 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +++
.../java/org/apache/tajo/catalog/Schema.java | 12 ++++++---
.../tajo/engine/planner/TestLogicalPlanner.java | 23 +++++++++++++++--
.../tajo/engine/query/TestInsertQuery.java | 19 ++++++++++++++
.../TestInsertQuery/nation_diff_col_order.ddl | 1 +
.../testInsertWithDifferentColumnOrder.sql | 1 +
.../testInsertWithDifferentColumnOrder.result | 27 ++++++++++++++++++++
.../org/apache/tajo/plan/LogicalPlanner.java | 9 ++++---
8 files changed, 86 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ebd88cd..a8074cd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -111,6 +111,9 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1556: "insert into select" with reordered column list does not work.
+ (Contributed by Yongjin Choi, Committed by jihoon)
+
TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)
TAJO-1574: Fix NPE on natural join.
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index fcbd177..054cc2c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -157,13 +157,19 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
}
public Column getColumn(Column column) {
+ int idx = getIndex(column);
+ return idx >= 0 ? fields.get(idx) : null;
+ }
+
+ public int getIndex(Column column) {
if (!contains(column)) {
- return null;
+ return -1;
}
+
if (column.hasQualifier()) {
- return fields.get(fieldsByQualifiedName.get(column.getQualifiedName()));
+ return fieldsByQualifiedName.get(column.getQualifiedName());
} else {
- return fields.get(fieldsByName.get(column.getSimpleName()).get(0));
+ return fieldsByName.get(column.getSimpleName()).get(0);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 0b59bc7..af0aa6a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -1106,7 +1106,7 @@ public class TestLogicalPlanner {
// Table descriptions
//
// employee (name text, empid int4, deptname text)
- // dept (deptname text, nameger text)
+ // dept (deptname text, manager text)
// score (deptname text, score inet4)
static final String [] insertStatements = {
@@ -1115,7 +1115,8 @@ public class TestLogicalPlanner {
"insert into employee (name, deptname) select * from dept", // 2
"insert into location '/tmp/data' select name, empid from employee", // 3
"insert overwrite into employee (name, deptname) select * from dept", // 4
- "insert overwrite into LOCATION '/tmp/data' select * from dept" // 5
+ "insert overwrite into LOCATION '/tmp/data' select * from dept", // 5
+ "insert into employee (deptname, name) select deptname, manager from dept" // 6
};
@Test
@@ -1198,6 +1199,24 @@ public class TestLogicalPlanner {
assertTrue(insertNode.hasPath());
}
+ @Test
+ public final void testInsertInto6() throws PlanningException {
+ QueryContext qc = new QueryContext(util.getConfiguration(), session);
+
+ Expr expr = sqlAnalyzer.parse(insertStatements[6]);
+ LogicalPlan plan = planner.createPlan(qc, expr);
+ assertEquals(1, plan.getQueryBlocks().size());
+ InsertNode insertNode = getInsertNode(plan);
+
+ ProjectionNode subquery = insertNode.getChild();
+ Target[] targets = subquery.getTargets();
+ // targets MUST be manager, NULL as empid, deptname
+ assertEquals(targets[0].getNamedColumn().getQualifiedName(), "default.dept.manager");
+ assertEquals(targets[1].getAlias(), "empid");
+ assertEquals(targets[1].getEvalTree().getType(), EvalType.CONST);
+ assertEquals(targets[2].getNamedColumn().getQualifiedName(), "default.dept.deptname");
+ }
+
private static InsertNode getInsertNode(LogicalPlan plan) {
LogicalRootNode root = plan.getRootBlock().getRoot();
assertEquals(NodeType.INSERT, root.getChild().getType());
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 4a1f601..b3e3402 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -817,4 +817,23 @@ public class TestInsertQuery extends QueryTestCaseBase {
assertNotNull(resultDatas);
assertEquals(expected, resultDatas);
}
+
+ @Test
+ public final void testInsertWithDifferentColumnOrder() throws Exception {
+ ResultSet res = executeFile("nation_diff_col_order.ddl");
+ res.close();
+
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), "nation_diff"));
+
+ try {
+ res = executeFile("testInsertWithDifferentColumnOrder.sql");
+ res.close();
+
+ res = executeString("select * from nation_diff");
+ assertResultSet(res);
+ } finally {
+ executeString("drop table nation_diff purge;");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl b/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl
new file mode 100644
index 0000000..6998304
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl
@@ -0,0 +1 @@
+create table nation_diff (n_nationkey int8, n_name text, n_regionkey int8, n_comment text);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql
new file mode 100644
index 0000000..ad360f9
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql
@@ -0,0 +1 @@
+insert overwrite into nation_diff (n_comment, n_name) select n_comment, n_name from default.nation;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result b/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result
new file mode 100644
index 0000000..4cd3b81
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result
@@ -0,0 +1,27 @@
+n_nationkey,n_name,n_regionkey,n_comment
+-------------------------------
+null,ALGERIA,null, haggle. carefully final deposits detect slyly agai
+null,ARGENTINA,null,al foxes promise slyly according to the regular accounts. bold requests alon
+null,BRAZIL,null,y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
+null,CANADA,null,eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
+null,EGYPT,null,y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
+null,ETHIOPIA,null,ven packages wake quickly. regu
+null,FRANCE,null,refully final requests. regular, ironi
+null,GERMANY,null,l platelets. regular accounts x-ray: unusual, regular acco
+null,INDIA,null,ss excuses cajole slyly across the packages. deposits print aroun
+null,INDONESIA,null, slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull
+null,IRAN,null,efully alongside of the slyly final dependencies.
+null,IRAQ,null,nic deposits boost atop the quickly final requests? quickly regula
+null,JAPAN,null,ously. final, express gifts cajole a
+null,JORDAN,null,ic deposits are blithely about the carefully regular pa
+null,KENYA,null, pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t
+null,MOROCCO,null,rns. blithely bold courts among the closely regular packages use furiously bold platelets?
+null,MOZAMBIQUE,null,s. ironic, unusual asymptotes wake blithely r
+null,PERU,null,platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
+null,CHINA,null,c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos
+null,ROMANIA,null,ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
+null,SAUDI ARABIA,null,ts. silent requests haggle. closely express packages sleep across the blithely
+null,VIETNAM,null,hely enticingly express accounts. even, final
+null,RUSSIA,null, requests against the platelets use never according to the quickly regular pint
+null,UNITED KINGDOM,null,eans boost carefully special requests. accounts are. carefull
+null,UNITED STATES,null,y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index d1c1a15..e0b4f7e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -1524,7 +1524,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// we use only a sequence of preceding columns of target table's schema
// as target columns.
//
- // For example, consider a target table and an 'insert into' query are give as follows:
+ // For example, consider a target table and an 'insert into' query are given as follows:
//
// CREATE TABLE TB1 (col1 int, col2 int, col3 long);
// || ||
@@ -1586,11 +1586,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// Modifying projected columns by adding NULL constants
// It is because that table appender does not support target columns to be written.
List<Target> targets = TUtil.newList();
- for (int i = 0, j = 0; i < tableSchema.size(); i++) {
+ for (int i = 0; i < tableSchema.size(); i++) {
Column column = tableSchema.getColumn(i);
- if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
- targets.add(projectionNode.getTargets()[j++]);
+ int idxInProjectionNode = targetColumns.getIndex(column);
+ if (idxInProjectionNode >= 0 && idxInProjectionNode < projectionNode.getTargets().length) {
+ targets.add(projectionNode.getTargets()[idxInProjectionNode]);
} else {
targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
}
[4/6] tajo git commit: TAJO-1584: Remove QueryMaster client sharing
in TajoMaster and TajoWorker.
Posted by ji...@apache.org.
TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
Closes #559
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/04167bdc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/04167bdc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/04167bdc
Branch: refs/heads/index_support
Commit: 04167bdc3bb04b53c5a245a9c18b6426ade82a26
Parents: b6b9d46
Author: Jinho Kim <jh...@apache.org>
Authored: Wed May 6 18:13:40 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed May 6 18:13:40 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/conf/TajoConf.java | 2 -
.../org/apache/tajo/master/QueryInProgress.java | 31 ++---
.../querymaster/QueryMasterManagerService.java | 135 ++++++++-----------
.../tajo/worker/ExecutionBlockContext.java | 32 +++--
.../java/org/apache/tajo/worker/TajoWorker.java | 1 +
.../tajo/worker/TajoWorkerManagerService.java | 2 +
.../main/java/org/apache/tajo/worker/Task.java | 4 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 43 +++---
.../src/main/proto/QueryMasterProtocol.proto | 14 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 7 +-
.../org/apache/tajo/rpc/RpcClientManager.java | 9 ++
12 files changed, 133 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a790655..ebd88cd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
+ (jinho)
+
TAJO-1563: Improve RPC error handling. (jinho)
TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index bfba290..46e7618 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -224,8 +224,6 @@ public class TajoConf extends Configuration {
HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()),
// RPC --------------------------------------------------------------------
- RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
-
// Internal RPC Client
INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index d2286cf..6a074a2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -31,14 +32,13 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.NetUtils;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -92,7 +92,9 @@ public class QueryInProgress {
try {
getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
if (queryMasterRpcClient != null) {
- queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+ queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture);
+ callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
} catch (Throwable e) {
catchException("Failed to kill query " + queryId + " by exception " + e, e);
@@ -111,9 +113,7 @@ public class QueryInProgress {
masterContext.getResourceManager().releaseQueryMaster(queryId);
- if(queryMasterRpc != null) {
- RpcClientManager.cleanup(queryMasterRpc);
- }
+ RpcClientManager.cleanup(queryMasterRpc);
try {
masterContext.getHistoryWriter().appendAndFlush(queryInfo);
@@ -156,8 +156,9 @@ public class QueryInProgress {
private void connectQueryMaster() throws Exception {
InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
- queryMasterRpc =
- RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true);
+
+ RpcClientManager.cleanup(queryMasterRpc);
+ queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true);
queryMasterRpcClient = queryMasterRpc.getStub();
}
@@ -177,11 +178,7 @@ public class QueryInProgress {
if(queryMasterRpcClient == null) {
connectQueryMaster();
}
- if(queryMasterRpcClient == null) {
- LOG.info("No QueryMaster connection info.");
- //TODO wait
- return;
- }
+
LOG.info("Call executeQuery to :" +
queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
@@ -192,11 +189,15 @@ public class QueryInProgress {
.setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
.setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
- queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+ CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+ queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture);
+ callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
querySubmitted.set(true);
getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
} catch (Exception e) {
LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e);
+ catchException(e.getMessage(), e);
} finally {
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 85cc553..59933a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -115,10 +115,6 @@ public class QueryMasterManagerService extends CompositeService
return bindAddr;
}
- public String getHostAndPort() {
- return bindAddr.getHostName() + ":" + bindAddr.getPort();
- }
-
@Override
public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
@@ -136,127 +132,106 @@ public class QueryMasterManagerService extends CompositeService
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
+ controller.setFailed(e.getMessage());
}
}
@Override
public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
- TaskAttemptId attemptId = new TaskAttemptId(request.getId());
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
- if (queryMasterTask == null) {
- queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
- }
- Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
- Task task = sq.getTask(attemptId.getTaskId());
- TaskAttempt attempt = task.getAttempt(attemptId.getId());
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
+ TaskAttemptId attemptId = new TaskAttemptId(request.getId());
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+ if (queryMasterTask == null) {
+ queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+ }
+ Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
+ Task task = sq.getTask(attemptId.getTaskId());
+ TaskAttempt attempt = task.getAttempt(attemptId.getId());
- if(LOG.isDebugEnabled()){
- LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
- }
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+ }
- if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
- LOG.warn(attemptId + " Killed");
- attempt.handle(
- new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
- } else {
- queryMasterTask.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
+ if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+ LOG.warn(attemptId + " Killed");
+ attempt.handle(
+ new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+ } else {
+ queryMasterTask.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
}
+
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void ping(RpcController controller,
TajoIdProtos.ExecutionBlockIdProto requestProto,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- done.run(TajoWorker.TRUE_PROTO);
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
- if (queryMasterTask != null) {
- queryMasterTask.handleTaskFailed(report);
- } else {
- LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+ if (queryMasterTask != null) {
+ queryMasterTask.handleTaskFailed(report);
+ } else {
+ LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
}
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
- if (queryMasterTask != null) {
- queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+ if (queryMasterTask != null) {
+ queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
}
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void doneExecutionBlock(
RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
+ RpcCallback<PrimitiveProtos.NullProto> done) {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
if (queryMasterTask != null) {
ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
}
- done.run(TajoWorker.TRUE_PROTO);
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
+ RpcCallback<PrimitiveProtos.NullProto> done) {
QueryId queryId = new QueryId(request);
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
if (queryMasterTask != null) {
- Query query = queryMasterTask.getQuery();
- if (query != null) {
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
- }
+ queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
}
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void executeQuery(RpcController controller,
TajoWorkerProtocol.QueryExecutionRequestProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
-
- QueryId queryId = new QueryId(request.getQueryId());
- LOG.info("Receive executeQuery request:" + queryId);
- queryMaster.handle(new QueryStartEvent(queryId,
- new Session(request.getSession()),
- new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
- request.getQueryContext()), request.getExprInJson().getValue(),
- request.getLogicalPlanJson().getValue()));
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
+ QueryId queryId = new QueryId(request.getQueryId());
+ LOG.info("Receive executeQuery request:" + queryId);
+ queryMaster.handle(new QueryStartEvent(queryId,
+ new Session(request.getSession()),
+ new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
+ request.getQueryContext()), request.getExprInJson().getValue(),
+ request.getLogicalPlanJson().getValue()));
+ done.run(TajoWorker.NULL_PROTO);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 0d26e6c..cd4b6a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
public class ExecutionBlockContext {
/** class logger */
@@ -78,6 +79,8 @@ public class ExecutionBlockContext {
private TajoQueryEngine queryEngine;
private RpcClientManager connManager;
private InetSocketAddress qmMasterAddr;
+ private NettyClientBase client;
+ private QueryMasterProtocol.QueryMasterProtocolService.Interface stub;
private WorkerConnectionInfo queryMaster;
private TajoConf systemConf;
// for the doAs block
@@ -132,16 +135,14 @@ public class ExecutionBlockContext {
// initialize DFS and LocalFileSystems
this.taskOwner = taskOwner;
+ this.stub = getRpcClient().getStub();
this.reporter.startReporter();
-
// resource intiailization
try{
this.resource.initialize(queryContext, plan);
} catch (Throwable e) {
try {
- NettyClientBase client = getQueryMasterConnection();
- QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
- stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+ getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
} catch (Throwable t) {
//ignore
}
@@ -153,9 +154,20 @@ public class ExecutionBlockContext {
return resource;
}
- public NettyClientBase getQueryMasterConnection()
+ private NettyClientBase getRpcClient()
throws NoSuchMethodException, ConnectException, ClassNotFoundException {
- return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true);
+ if (client != null) return client;
+
+ client = connManager.newClient(qmMasterAddr, QueryMasterProtocol.class, true);
+ return client;
+ }
+
+ public Interface getStub() {
+ return stub;
+ }
+
+ public boolean isStopped() {
+ return stop.get();
}
public void stop(){
@@ -184,6 +196,7 @@ public class ExecutionBlockContext {
tasks.clear();
resource.release();
+ RpcClientManager.cleanup(client);
}
public TajoConf getConf() {
@@ -282,8 +295,7 @@ public class ExecutionBlockContext {
/* This case is that worker did not ran tasks */
if(completedTasksNum.get() == 0) return;
- NettyClientBase client = getQueryMasterConnection();
- QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+ Interface stub = getStub();
ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
reporterBuilder.setEbId(ebId.getProto());
@@ -379,10 +391,8 @@ public class ExecutionBlockContext {
public void run() {
while (!reporterStop.get() && !Thread.interrupted()) {
- NettyClientBase client = null;
try {
- client = getQueryMasterConnection();
- QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();
+ Interface masterStub = getStub();
if(tasks.size() == 0){
masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 79b83e4..b666f80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -77,6 +77,7 @@ import static org.apache.tajo.conf.TajoConf.ConfVars;
public class TajoWorker extends CompositeService {
public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+ public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build();
private static final Log LOG = LogFactory.getLog(TajoWorker.class);
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 71d96c4..bbf8564 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -127,6 +127,7 @@ public class TajoWorkerManagerService extends CompositeService
done.run(TajoWorker.TRUE_PROTO);
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
+ controller.setFailed(t.getMessage());
done.run(TajoWorker.FALSE_PROTO);
}
}
@@ -142,6 +143,7 @@ public class TajoWorkerManagerService extends CompositeService
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
+ controller.setFailed(e.getMessage());
done.run(TajoWorker.FALSE_PROTO);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index a983f78..53ed73e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -440,9 +440,7 @@ public class Task {
executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
- NettyClientBase client = executionBlockContext.getQueryMasterConnection();
-
- QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
+ QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
if (context.isStopped()) {
context.setExecutorProgress(0.0f);
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 6076913..31f25f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -194,24 +194,8 @@ public class TaskRunner extends AbstractService {
CallFuture<TaskRequestProto> callFuture = null;
TaskRequestProto taskRequest = null;
- while(!stopped) {
- NettyClientBase client;
- try {
- client = executionBlockContext.getQueryMasterConnection();
- } catch (ConnectException ce) {
- // NettyClientBase throws ConnectTimeoutException if connection was failed
- stop();
- getContext().stopTaskRunner(getId());
- LOG.error("Connecting to QueryMaster was failed.", ce);
- break;
- } catch (Throwable t) {
- LOG.fatal("Unable to handle exception: " + t.getMessage(), t);
- stop();
- getContext().stopTaskRunner(getId());
- break;
- }
-
- QueryMasterProtocolService.Interface qmClientService = client.getStub();
+ while(!stopped && !executionBlockContext.isStopped()) {
+ QueryMasterProtocolService.Interface qmClientService = executionBlockContext.getStub();
try {
if (callFuture == null) {
@@ -243,8 +227,12 @@ public class TaskRunner extends AbstractService {
}
continue;
} catch (ExecutionException ee) {
- LOG.error(ee.getMessage(), ee);
- break;
+ if(!getContext().isStopped()){
+ LOG.error(ee.getMessage(), ee);
+ } else {
+ /* EB is stopped */
+ break;
+ }
}
if (taskRequest != null) {
@@ -253,9 +241,6 @@ public class TaskRunner extends AbstractService {
// immediately.
if (taskRequest.getShouldDie()) {
LOG.info("Received ShouldDie flag:" + getId());
- stop();
- //notify to TaskRunnerManager
- getContext().stopTaskRunner(getId());
} else {
getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
LOG.info("Accumulated Received Task: " + (++receivedNum));
@@ -268,7 +253,7 @@ public class TaskRunner extends AbstractService {
}
LOG.info("Initializing: " + taskAttemptId);
- Task task;
+ Task task = null;
try {
task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
new TaskRequestImpl(taskRequest));
@@ -283,20 +268,22 @@ public class TaskRunner extends AbstractService {
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
fatalError(qmClientService, taskAttemptId, t.getMessage());
+ if(task != null) {
+ task.cleanupTask();
+ }
} finally {
callFuture = null;
taskRequest = null;
}
}
- } else {
- stop();
- //notify to TaskRunnerManager
- getContext().stopTaskRunner(getId());
}
} catch (Throwable t) {
LOG.fatal(t.getMessage(), t);
}
}
+ stop();
+ //notify to TaskRunnerManager
+ getContext().stopTaskRunner(getId());
}
});
taskLauncher.start();
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index ae20309..855c2c6 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -34,13 +34,13 @@ package hadoop.yarn;
service QueryMasterProtocolService {
//from Worker
rpc getTask(GetTaskRequestProto) returns (TaskRequestProto);
- rpc statusUpdate (TaskStatusProto) returns (BoolProto);
- rpc ping (ExecutionBlockIdProto) returns (BoolProto);
- rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
- rpc done (TaskCompletionReport) returns (BoolProto);
- rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);
+ rpc statusUpdate (TaskStatusProto) returns (NullProto);
+ rpc ping (ExecutionBlockIdProto) returns (NullProto);
+ rpc fatalError(TaskFatalErrorReport) returns (NullProto);
+ rpc done (TaskCompletionReport) returns (NullProto);
+ rpc doneExecutionBlock(ExecutionBlockReport) returns (NullProto);
//from TajoMaster's QueryJobManager
- rpc killQuery(QueryIdProto) returns (BoolProto);
- rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+ rpc killQuery(QueryIdProto) returns (NullProto);
+ rpc executeQuery(QueryExecutionRequestProto) returns (NullProto);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 8f6f9ed..0d86527 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -195,7 +195,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
if (maxRetries > retries) {
retries++;
- LOG.warn(future.cause() + " Try to reconnect : " + getKey().addr);
+ LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + " Try to reconnect : " + getKey().addr);
try {
Thread.sleep(RpcConstants.DEFAULT_PAUSE);
} catch (InterruptedException e) {
@@ -246,8 +246,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
private String getErrorMessage(String message) {
return "Exception [" + getKey().protocolClass.getCanonicalName() +
- "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
- getChannel().remoteAddress()) + ")]: " + message;
+ "(" + getKey().addr + ")]: " + message;
}
@Override
@@ -332,7 +331,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
throws Exception {
Throwable rootCause = ExceptionUtils.getRootCause(cause);
- LOG.error(getKey().addr + "," + getKey().protocolClass + "," + ExceptionUtils.getMessage(rootCause), rootCause);
+ LOG.error(getErrorMessage(ExceptionUtils.getMessage(rootCause)), rootCause);
if (cause instanceof RecoverableException) {
sendException((RecoverableException) cause);
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index f8def7f..111754e 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -130,6 +130,15 @@ public class RpcClientManager {
return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing);
}
+ public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr,
+ Class<?> protocolClass,
+ boolean asyncMode)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+ return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode),
+ retries, getTimeoutSeconds(), TimeUnit.SECONDS, true);
+ }
+
public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key,
int retries,
long timeout,
[2/6] tajo git commit: TAJO-1583: Remove ServerCallable in RPC
client. (jinho)
Posted by ji...@apache.org.
TAJO-1583: Remove ServerCallable in RPC client. (jinho)
Closes #556
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47554105
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47554105
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47554105
Branch: refs/heads/index_support
Commit: 475541057891518e08e5a18ebbbf916c1ad60c10
Parents: 9540f16
Author: Jinho Kim <jh...@apache.org>
Authored: Thu Apr 30 16:51:56 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Thu Apr 30 16:51:56 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/catalog/AbstractCatalogClient.java | 569 ++++++-------------
.../org/apache/tajo/catalog/CatalogClient.java | 49 +-
.../org/apache/tajo/catalog/CatalogServer.java | 8 +-
.../tajo/catalog/LocalCatalogWrapper.java | 20 +-
.../tajo/client/CatalogAdminClientImpl.java | 236 +++-----
.../org/apache/tajo/client/QueryClientImpl.java | 328 +++++------
.../apache/tajo/client/SessionConnection.java | 275 ++++-----
.../cli/tsql/TestDefaultCliOutputFormatter.java | 4 -
.../apache/tajo/querymaster/TestKillQuery.java | 63 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 3 +-
.../tajo/rpc/RetriesExhaustedException.java | 104 ----
.../org/apache/tajo/rpc/ServerCallable.java | 148 -----
.../org/apache/tajo/rpc/TestBlockingRpc.java | 39 --
14 files changed, 618 insertions(+), 1230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8bda2bd..952f852 100644
--- a/CHANGES
+++ b/CHANGES
@@ -214,6 +214,8 @@ Release 0.11.0 - unreleased
TASKS
+ TAJO-1583: Remove ServerCallable in RPC client. (jinho)
+
TAJO-1587: Upgrade java version to 1.7 for Travis CI. (jihoon)
TAJO-1559: Fix data model description (tinyint, smallint).
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 49be29a..766f6c2 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,16 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.ProtoUtil;
-import java.net.InetSocketAddress;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -46,50 +42,27 @@ import java.util.List;
/**
* CatalogClient provides a client API to access the catalog server.
*/
-public abstract class AbstractCatalogClient implements CatalogService {
- private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
+public abstract class AbstractCatalogClient implements CatalogService, Closeable {
+ protected final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
- protected ServiceTracker serviceTracker;
- protected RpcClientManager manager;
- protected InetSocketAddress catalogServerAddr;
protected TajoConf conf;
- abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
-
- public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
- this.manager = RpcClientManager.getInstance();
- this.catalogServerAddr = catalogServerAddr;
- this.serviceTracker = ServiceTrackerFactory.get(conf);
+ public AbstractCatalogClient(TajoConf conf) {
this.conf = conf;
}
- private InetSocketAddress getCatalogServerAddr() {
- if (catalogServerAddr == null) {
- return null;
- } else {
-
- if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return catalogServerAddr;
- } else {
- return serviceTracker.getCatalogAddress();
- }
- }
- }
+ abstract CatalogProtocolService.BlockingInterface getStub() throws ServiceException;
@Override
public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
+ CatalogProtocolService.BlockingInterface stub = getStub();
- CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
- builder.setTablespaceName(tablespaceName);
- builder.setTablespaceUri(tablespaceUri);
- return stub.createTablespace(null, builder.build()).getValue();
- }
- }.withRetries();
- } catch (ServiceException e) {
+ CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
+ builder.setTablespaceName(tablespaceName);
+ builder.setTablespaceUri(tablespaceUri);
+ return stub.createTablespace(null, builder.build()).getValue();
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
}
@@ -98,12 +71,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean dropTablespace(final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -113,12 +82,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean existTablespace(final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -128,46 +93,32 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<String> getAllTablespaceNames() {
try {
- return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<String> call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
- return ProtoUtil.convertStrings(response);
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
+ return ProtoUtil.convertStrings(response);
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<String>();
}
}
@Override
public List<TablespaceProto> getAllTablespaces() {
try {
- return new ServerCallable<List<TablespaceProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TablespaceProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
- return response.getTablespaceList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
+ return response.getTablespaceList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TablespaceProto>();
}
}
@Override
public TablespaceProto getTablespace(final String tablespaceName) {
try {
- return new ServerCallable<TablespaceProto>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public TablespaceProto call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -177,12 +128,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.alterTablespace(null, alterTablespace).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.alterTablespace(null, alterTablespace).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -192,18 +139,14 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
+ CatalogProtocolService.BlockingInterface stub = getStub();
- CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
- builder.setDatabaseName(databaseName);
- if (tablespaceName != null) {
- builder.setTablespaceName(tablespaceName);
- }
- return stub.createDatabase(null, builder.build()).getValue();
- }
- }.withRetries();
+ CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
+ builder.setDatabaseName(databaseName);
+ if (tablespaceName != null) {
+ builder.setTablespaceName(tablespaceName);
+ }
+ return stub.createDatabase(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -213,12 +156,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean dropDatabase(final String databaseName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -228,12 +167,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean existDatabase(final String databaseName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -243,50 +178,36 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<String> getAllDatabaseNames() {
try {
- return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<String> call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
- return ProtoUtil.convertStrings(response);
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
+ return ProtoUtil.convertStrings(response);
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<String>();
}
}
@Override
public List<DatabaseProto> getAllDatabases() {
try {
- return new ServerCallable<List<DatabaseProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<DatabaseProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
- return response.getDatabaseList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
+ return response.getDatabaseList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<DatabaseProto>();
}
}
@Override
public final TableDesc getTableDesc(final String databaseName, final String tableName) {
try {
- return new ServerCallable<TableDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public TableDesc call(NettyClientBase client) throws ServiceException {
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -302,89 +223,60 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public List<TableDescriptorProto> getAllTables() {
try {
- return new ServerCallable<List<TableDescriptorProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TableDescriptorProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
- return response.getTableList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
+ return response.getTableList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TableDescriptorProto>();
}
}
@Override
public List<TableOptionProto> getAllTableOptions() {
try {
- return new ServerCallable<List<TableOptionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TableOptionProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
- return response.getTableOptionList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
+ return response.getTableOptionList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TableOptionProto>();
}
}
@Override
public List<TableStatsProto> getAllTableStats() {
try {
- return new ServerCallable<List<TableStatsProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TableStatsProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
- return response.getStatList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
+ return response.getStatList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TableStatsProto>();
}
}
@Override
public List<ColumnProto> getAllColumns() {
try {
- return new ServerCallable<List<ColumnProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<ColumnProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
- return response.getColumnList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
+ return response.getColumnList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<ColumnProto>();
}
}
@Override
public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -394,17 +286,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean existPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existPartitionMethod(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existPartitionMethod(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -415,18 +302,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
public final PartitionDescProto getPartition(final String databaseName, final String tableName,
final String partitionName) {
try {
- return new ServerCallable<PartitionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public PartitionDescProto call(NettyClientBase client) throws ServiceException {
-
- PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
- builder.setPartitionName(partitionName);
+ PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
+ builder.setPartitionName(partitionName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.getPartitionByPartitionName(null, builder.build());
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.getPartitionByPartitionName(null, builder.build());
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -436,94 +318,70 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) {
try {
- return new ServerCallable<List<PartitionDescProto>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class,
- false) {
- public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException {
+ PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
- return response.getPartitionList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
+ return response.getPartitionList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<PartitionDescProto>();
}
}
@Override
public List<TablePartitionProto> getAllPartitions() {
try {
- return new ServerCallable<List<TablePartitionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TablePartitionProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
- return response.getPartList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
+ return response.getPartList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TablePartitionProto>();
}
}
@Override
public final Collection<String> getAllTableNames(final String databaseName) {
try {
- return new ServerCallable<Collection<String>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<String> call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
- return ProtoUtil.convertStrings(response);
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
+ return ProtoUtil.convertStrings(response);
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<String>();
}
}
@Override
public final Collection<FunctionDesc> getFunctions() {
- try {
- return new ServerCallable<Collection<FunctionDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
- List<FunctionDesc> list = new ArrayList<FunctionDesc>();
- GetFunctionsResponse response;
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- response = stub.getFunctions(null, NullProto.newBuilder().build());
- int size = response.getFunctionDescCount();
- for (int i = 0; i < size; i++) {
- try {
- list.add(new FunctionDesc(response.getFunctionDesc(i)));
- } catch (ClassNotFoundException e) {
- LOG.error(e, e);
- return null;
- }
- }
+ List<FunctionDesc> list = new ArrayList<FunctionDesc>();
+ try {
+ GetFunctionsResponse response;
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ response = stub.getFunctions(null, NullProto.newBuilder().build());
+ int size = response.getFunctionDescCount();
+ for (int i = 0; i < size; i++) {
+ try {
+ list.add(new FunctionDesc(response.getFunctionDesc(i)));
+ } catch (ClassNotFoundException e) {
+ LOG.error(e, e);
return list;
}
- }.withRetries();
+ }
+ return list;
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return list;
}
}
@Override
public final boolean createTable(final TableDesc desc) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.createTable(null, desc.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.createTable(null, desc.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -537,17 +395,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
final String simpleName = splitted[1];
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(simpleName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(simpleName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropTable(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropTable(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -561,17 +414,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
"tableName cannot be composed of multiple parts, but it is \"" + tableName + "\"");
}
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existsTable(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existsTable(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -586,12 +434,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean createIndex(final IndexDesc index) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.createIndex(null, index.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.createIndex(null, index.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -601,16 +445,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean existIndexByName(final String databaseName, final String indexName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- IndexNameProto.Builder builder = IndexNameProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setIndexName(indexName);
+ IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setIndexName(indexName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexByName(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existIndexByName(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -620,17 +460,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
+ GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+ builder.setColumnName(columnName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexByColumn(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existIndexByColumn(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -640,17 +476,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final IndexDesc getIndexByName(final String databaseName, final String indexName) {
try {
- return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public IndexDesc call(NettyClientBase client) throws ServiceException {
-
- IndexNameProto.Builder builder = IndexNameProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setIndexName(indexName);
+ IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setIndexName(indexName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return new IndexDesc(stub.getIndexByName(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return new IndexDesc(stub.getIndexByName(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -662,17 +493,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
final String tableName,
final String columnName) {
try {
- return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public IndexDesc call(NettyClientBase client) throws ServiceException {
+ GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+ builder.setColumnName(columnName);
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -683,17 +509,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
public boolean dropIndex(final String databaseName,
final String indexName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
+ IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setIndexName(indexName);
- IndexNameProto.Builder builder = IndexNameProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setIndexName(indexName);
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropIndex(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropIndex(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -703,30 +524,20 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public List<IndexProto> getAllIndexes() {
try {
- return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<IndexProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
- return response.getIndexList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
+ return response.getIndexList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<IndexProto>();
}
}
@Override
public final boolean createFunction(final FunctionDesc funcDesc) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.createFunction(null, funcDesc.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.createFunction(null, funcDesc.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -736,15 +547,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean dropFunction(final String signature) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
- builder.setSignature(signature);
+ UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
+ builder.setSignature(signature);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropFunction(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropFunction(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -769,24 +576,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
FunctionDescProto descProto = null;
try {
- descProto = new ServerCallable<FunctionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public FunctionDescProto call(NettyClientBase client) throws ServiceException {
- try {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.getFunctionMeta(null, builder.build());
- } catch (NoSuchFunctionException e) {
- abort();
- throw e;
- }
- }
- }.withRetries();
- } catch(ServiceException e) {
- // this is not good. we need to define user massage exception
- if(e.getCause() instanceof NoSuchFunctionException){
- LOG.debug(e.getMessage());
- } else {
- LOG.error(e.getMessage(), e);
- }
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ descProto = stub.getFunctionMeta(null, builder.build());
+ } catch (NoSuchFunctionException e) {
+ LOG.debug(e.getMessage());
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
}
if (descProto == null) {
@@ -819,27 +614,21 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.containFunction(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.containFunction(null, builder.build()).getValue();
+ } catch (InvalidOperationException e) {
+ LOG.error(e.getMessage());
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return false;
}
+ return false;
}
@Override
public final boolean alterTable(final AlterTableDesc desc) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.alterTable(null, desc.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.alterTable(null, desc.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -849,12 +638,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.updateTableStats(null, updateTableStatsProto).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.updateTableStats(null, updateTableStatsProto).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 7666a97..80ded4a 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -18,35 +18,72 @@
package org.apache.tajo.catalog;
+import com.google.protobuf.ServiceException;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.NetUtils;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
/**
* CatalogClient provides a client API to access the catalog server.
*/
public class CatalogClient extends AbstractCatalogClient {
+ protected NettyClientBase client;
+ protected ServiceTracker serviceTracker;
+ protected InetSocketAddress catalogServerAddr;
/**
* @throws java.io.IOException
*
*/
public CatalogClient(final TajoConf conf) throws IOException {
- super(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS)));
+ super(conf);
+ this.catalogServerAddr = NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS));
+ this.serviceTracker = ServiceTrackerFactory.get(conf);
}
- public CatalogClient(TajoConf conf, String host, int port) throws IOException {
- super(conf, NetUtils.createSocketAddr(host, port));
- }
@Override
- BlockingInterface getStub(NettyClientBase client) {
- return client.getStub();
+ BlockingInterface getStub() throws ServiceException {
+ return getCatalogConnection().getStub();
+ }
+
+ private InetSocketAddress getCatalogServerAddr() {
+ if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ return catalogServerAddr;
+ } else {
+ return serviceTracker.getCatalogAddress();
+ }
}
+ public synchronized NettyClientBase getCatalogConnection() throws ServiceException {
+ if (client != null && client.isConnected()) return client;
+ else {
+ try {
+ if (client != null && client.isConnected()) return client;
+ RpcClientManager.cleanup(client);
+
+ int retry = conf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES);
+ // Client do not closed on idle state for support high available
+ this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, false,
+ retry, 0, TimeUnit.SECONDS, false);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ return client;
+ }
+ }
+
+ @Override
public void close() {
+ RpcClientManager.cleanup(client);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index e9fb177..f2e9795 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -33,7 +33,6 @@ import org.apache.tajo.annotation.ThreadSafe;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.store.CatalogStore;
import org.apache.tajo.catalog.store.DerbyStore;
@@ -61,7 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
/**
* This class provides the catalog service. The catalog service enables clients
@@ -1192,7 +1190,7 @@ public class CatalogServer extends AbstractService {
if (functions.containsKey(funcDesc.getSignature())) {
FunctionDescProto found = findFunctionStrictType(funcDesc, true);
if (found != null) {
- throw new AlreadyExistsFunctionException(signature.toString());
+ throw new ServiceException(new AlreadyExistsFunctionException(signature.toString()));
}
}
@@ -1209,7 +1207,7 @@ public class CatalogServer extends AbstractService {
throws ServiceException {
if (!containFunction(request.getSignature())) {
- throw new NoSuchFunctionException(request.getSignature(), new DataType[] {});
+ throw new ServiceException(new NoSuchFunctionException(request.getSignature(), new DataType[]{}));
}
functions.remove(request.getSignature());
@@ -1231,7 +1229,7 @@ public class CatalogServer extends AbstractService {
}
if (function == null) {
- throw new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList());
+ throw new ServiceException(new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList()));
} else {
return function;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
index df9bd2c..35e9e2e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
@@ -22,9 +22,6 @@
package org.apache.tajo.catalog;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-
-import java.io.IOException;
/**
* This class provides a catalog service interface in
@@ -34,20 +31,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
private CatalogServer catalog;
private CatalogProtocol.CatalogProtocolService.BlockingInterface stub;
- public LocalCatalogWrapper(final TajoConf conf) throws IOException {
- super(conf, null);
- this.catalog = new CatalogServer();
- this.catalog.init(conf);
- this.catalog.start();
- this.stub = catalog.getHandler();
- }
-
public LocalCatalogWrapper(final CatalogServer server) {
this(server, server.getConf());
}
public LocalCatalogWrapper(final CatalogServer server, final TajoConf conf) {
- super(conf, null);
+ super(conf);
this.catalog = server;
this.stub = server.getHandler();
}
@@ -57,7 +46,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
}
@Override
- CatalogProtocol.CatalogProtocolService.BlockingInterface getStub(NettyClientBase client) {
+ CatalogProtocol.CatalogProtocolService.BlockingInterface getStub() {
return stub;
}
+
+ @Override
+ public void close() {
+ //nothing to do
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 9d0e427..9397fcf 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@ -27,10 +27,8 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
import java.io.IOException;
import java.net.URI;
@@ -48,79 +46,45 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
@Override
public boolean createDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMaster = client.getStub();
- return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMaster = client.getStub();
+ return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
}
@Override
public boolean existDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMaster = client.getStub();
- return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMaster = client.getStub();
+ return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
}
@Override
public boolean dropDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
}
@Override
public List<String> getAllDatabaseNames() throws ServiceException {
- return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public List<String> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
}
public boolean existTable(final String tableName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
}
@Override
@@ -133,32 +97,25 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
throws SQLException, ServiceException {
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setName(tableName);
- builder.setSchema(schema.getProto());
- builder.setMeta(meta.getProto());
- builder.setPath(path.toString());
- if (partitionMethodDesc != null) {
- builder.setPartition(partitionMethodDesc.getProto());
- }
- ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setName(tableName);
+ builder.setSchema(schema.getProto());
+ builder.setMeta(meta.getProto());
+ builder.setPath(path.toString());
+ if (partitionMethodDesc != null) {
+ builder.setPartition(partitionMethodDesc.getProto());
+ }
+ ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+ if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+ }
}
@Override
@@ -169,94 +126,67 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
@Override
public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setName(tableName);
- builder.setPurge(purge);
- return tajoMasterService.dropTable(null, builder.build()).getValue();
- }
-
- }.withRetries();
+ ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setName(tableName);
+ builder.setPurge(purge);
+ return tajoMasterService.dropTable(null, builder.build()).getValue();
}
@Override
public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
- return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public List<String> call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- if (databaseName != null) {
- builder.setDatabaseName(databaseName);
- }
- ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
- return res.getTablesList();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ if (databaseName != null) {
+ builder.setDatabaseName(databaseName);
+ }
+ ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
+ return res.getTablesList();
}
@Override
public TableDesc getTableDesc(final String tableName) throws ServiceException {
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setTableName(tableName);
+ ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
+ if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
+ }
}
@Override
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
- return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- String paramFunctionName = functionName == null ? "" : functionName;
- ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
- connection.convertSessionedString(paramFunctionName));
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
- return res.getFunctionsList();
- } else {
- throw new SQLException(res.getErrorMessage());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ String paramFunctionName = functionName == null ? "" : functionName;
+ ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+ connection.convertSessionedString(paramFunctionName));
+ if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+ return res.getFunctionsList();
+ } else {
+ throw new ServiceException(new SQLException(res.getErrorMessage()));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 99c58b6..53889fe 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -28,11 +28,10 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.jdbc.FetchResultSet;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.util.ProtoUtil;
import java.io.IOException;
@@ -40,6 +39,7 @@ import java.net.InetSocketAddress;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.tajo.ipc.ClientProtos.*;
import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
@@ -102,7 +102,7 @@ public class QueryClientImpl implements QueryClient {
public void closeNonForwardQuery(QueryId queryId) {
NettyClientBase tmClient = null;
try {
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub();
connection.checkSessionAndGet(tmClient);
@@ -153,50 +153,37 @@ public class QueryClientImpl implements QueryClient {
@Override
public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
- return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ final QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
-
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
- SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
- if (response.getResultCode() == ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- }
- return response;
- }
- }.withRetries();
+ SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
+ if (response.getResultCode() == ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ }
+ return response;
}
@Override
public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ final QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(json);
+ builder.setIsJson(true);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.submitQuery(null, builder.build());
- }
- }.withRetries();
+ return tajoMasterService.submitQuery(null, builder.build());
}
@Override
@@ -308,7 +295,7 @@ public class QueryClientImpl implements QueryClient {
NettyClientBase tmClient = null;
try {
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
connection.checkSessionAndGet(tmClient);
builder.setSessionId(connection.sessionId);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
@@ -348,7 +335,7 @@ public class QueryClientImpl implements QueryClient {
try {
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
connection.checkSessionAndGet(tmClient);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
@@ -369,42 +356,26 @@ public class QueryClientImpl implements QueryClient {
throws ServiceException {
try {
- final ServerCallable<ClientProtos.SerializedResultSet> callable =
- new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
- builder.setFetchRowNum(fetchRowNum);
- try {
- GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
- abort();
- throw new ServiceException(response.getErrorMessage());
- }
-
- return response.getResultSet();
- } catch (ServiceException e) {
- abort();
- throw e;
- } catch (Throwable t) {
- throw new ServiceException(t.getMessage(), t);
- }
- }
- };
-
- ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+ builder.setFetchRowNum(fetchRowNum);
+
+ GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
+ if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+ throw new ServiceException(response.getErrorMessage());
+ }
+
+ ClientProtos.SerializedResultSet resultSet = response.getResultSet();
return new TajoMemoryResultSet(queryId,
- new Schema(serializedResultSet.getSchema()),
- serializedResultSet.getSerializedTuplesList(),
- serializedResultSet.getSerializedTuplesCount(),
+ new Schema(resultSet.getSchema()),
+ resultSet.getSerializedTuplesList(),
+ resultSet.getSerializedTuplesCount(),
getClientSideSessionVars());
} catch (ServiceException e) {
throw e;
@@ -416,119 +387,86 @@ public class QueryClientImpl implements QueryClient {
@Override
public boolean updateQuery(final String sql) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public Boolean call(NettyClientBase client) throws ServiceException {
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return true;
- } else {
- if (response.hasErrorMessage()) {
- System.err.println("ERROR: " + response.getErrorMessage());
- }
- return false;
- }
+ if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return true;
+ } else {
+ if (response.hasErrorMessage()) {
+ LOG.error("ERROR: " + response.getErrorMessage());
}
- }.withRetries();
+ return false;
+ }
}
@Override
public boolean updateQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
- return true;
- } else {
- if (response.hasErrorMessage()) {
- System.err.println("ERROR: " + response.getErrorMessage());
- }
- return false;
- }
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(json);
+ builder.setIsJson(true);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+ if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+ return true;
+ } else {
+ if (response.hasErrorMessage()) {
+ LOG.error("ERROR: " + response.getErrorMessage());
}
- }.withRetries();
+ return false;
+ }
}
@Override
public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException {
- return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
- return res.getQueryListList();
-
- }
- }.withRetries();
+ ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
+ return res.getQueryListList();
}
@Override
public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException {
- return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
- return res.getQueryListList();
-
- }
- }.withRetries();
+ ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
+ return res.getQueryListList();
}
@Override
public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException {
- return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
- return res.getWorkerListList();
- }
-
- }.withRetries();
+ ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
+ return res.getWorkerListList();
}
@Override
@@ -540,7 +478,7 @@ public class QueryClientImpl implements QueryClient {
NettyClientBase tmClient = null;
try {
/* send a kill to the TM */
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
connection.checkSessionAndGet(tmClient);
@@ -581,25 +519,20 @@ public class QueryClientImpl implements QueryClient {
}
public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
- return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
- public QueryInfoProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
-
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
- if (res.getResultCode() == ResultCode.OK) {
- return res.getQueryInfo();
- } else {
- abort();
- throw new ServiceException(res.getErrorMessage());
- }
- }
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
+ if (res.getResultCode() == ResultCode.OK) {
+ return res.getQueryInfo();
+ } else {
+ throw new ServiceException(res.getErrorMessage());
+ }
}
public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@ -611,24 +544,31 @@ public class QueryClientImpl implements QueryClient {
InetSocketAddress qmAddress = new InetSocketAddress(
queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
- return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
- QueryMasterClientProtocol.class, false) {
- public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
+ RpcClientManager manager = RpcClientManager.getInstance();
+ NettyClientBase queryMasterClient;
+ try {
+ queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+ manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
+ try {
+ connection.checkSessionAndGet(connection.getTajoMasterConnection());
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
- GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
- if (res.getResultCode() == ResultCode.OK) {
- return res.getQueryHistory();
- } else {
- abort();
- throw new ServiceException(res.getErrorMessage());
- }
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+
+ QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+ GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
+ if (res.getResultCode() == ResultCode.OK) {
+ return res.getQueryHistory();
+ } else {
+ throw new ServiceException(res.getErrorMessage());
}
- }.withRetries();
+ } finally {
+ queryMasterClient.close();
+ }
}
}
[6/6] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/42bcf2de
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/42bcf2de
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/42bcf2de
Branch: refs/heads/index_support
Commit: 42bcf2de090bf1bb5b5ec711427654056a2866e2
Parents: 86c97b2 9b3824b
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 8 01:35:19 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 8 01:35:19 2015 +0900
----------------------------------------------------------------------
CHANGES | 10 +
.../tajo/catalog/AbstractCatalogClient.java | 638 +++++++------------
.../org/apache/tajo/catalog/CatalogClient.java | 49 +-
.../java/org/apache/tajo/catalog/Schema.java | 12 +-
.../org/apache/tajo/catalog/CatalogServer.java | 8 +-
.../tajo/catalog/LocalCatalogWrapper.java | 20 +-
.../tajo/client/CatalogAdminClientImpl.java | 518 ++++++++-------
.../org/apache/tajo/client/QueryClientImpl.java | 469 ++++++++------
.../apache/tajo/client/SessionConnection.java | 351 +++++-----
.../java/org/apache/tajo/conf/TajoConf.java | 2 -
.../org/apache/tajo/master/QueryInProgress.java | 31 +-
.../querymaster/QueryMasterManagerService.java | 135 ++--
.../tajo/worker/ExecutionBlockContext.java | 32 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 1 +
.../tajo/worker/TajoWorkerManagerService.java | 2 +
.../main/java/org/apache/tajo/worker/Task.java | 4 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 43 +-
.../src/main/proto/QueryMasterProtocol.proto | 14 +-
.../cli/tsql/TestDefaultCliOutputFormatter.java | 4 -
.../tajo/engine/planner/TestLogicalPlanner.java | 23 +-
.../tajo/engine/query/TestInsertQuery.java | 19 +
.../apache/tajo/querymaster/TestKillQuery.java | 63 +-
.../TestInsertQuery/nation_diff_col_order.ddl | 1 +
.../testInsertWithDifferentColumnOrder.sql | 1 +
.../testInsertWithDifferentColumnOrder.result | 27 +
.../org/apache/tajo/plan/LogicalPlanner.java | 9 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 10 +-
.../tajo/rpc/RetriesExhaustedException.java | 104 ---
.../org/apache/tajo/rpc/RpcClientManager.java | 9 +
.../org/apache/tajo/rpc/ServerCallable.java | 148 -----
.../org/apache/tajo/rpc/TestBlockingRpc.java | 39 --
.../org/apache/tajo/storage/rcfile/RCFile.java | 14 +-
.../tajo/storage/text/CSVLineDeserializer.java | 18 +-
.../org/apache/tajo/storage/TestStorages.java | 59 ++
34 files changed, 1365 insertions(+), 1522 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index b52b2f5,766f6c2..c872f8b
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -29,17 -29,12 +29,13 @@@ import org.apache.tajo.catalog.proto.Ca
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.RpcClientManager;
- import org.apache.tajo.rpc.ServerCallable;
+ import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
- import org.apache.tajo.service.ServiceTracker;
- import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.TUtil;
- import java.net.InetSocketAddress;
+ import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@@ -373,37 -269,14 +270,26 @@@ public abstract class AbstractCatalogCl
}
@Override
+ public List<IndexDescProto> getAllIndexes() {
+ try {
- return new ServerCallable<List<IndexDescProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<IndexDescProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
- return response.getIndexList();
- }
- }.withRetries();
++ CatalogProtocolService.BlockingInterface stub = getStub();
++ GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++ return response.getIndexList();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@@ -637,42 -458,15 +471,42 @@@
}
@Override
- public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
+ public boolean existIndexByColumns(final String databaseName, final String tableName, final Column [] columns) {
+ return existIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
+ }
+
+ @Override
+ public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- for (String colunName : columnNames) {
- builder.addColumnNames(colunName);
- }
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//<<<<<<< HEAD
++ GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
++ for (String colunName : columnNames) {
++ builder.addColumnNames(colunName);
++ }
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexByColumnNames(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
- return stub.existIndexByColumn(null, builder.build()).getValue();
++ return stub.existIndexByColumnNames(null, builder.build()).getValue();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean existIndexesByTable(final String databaseName, final String tableName) {
+ try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
- }
- }.withRetries();
++ CatalogProtocolService.BlockingInterface stub = getStub();
++ return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
++//=======
++// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++// builder.setColumnName(columnName);
++//
++// CatalogProtocolService.BlockingInterface stub = getStub();
++// return stub.existIndexByColumn(null, builder.build()).getValue();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@@ -699,62 -488,17 +528,61 @@@
}
}
+ private static String[] extractColumnNames(Column[] columns) {
+ String[] columnNames = new String [columns.length];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnNames[i] = columns[i].getSimpleName();
+ }
+ return columnNames;
+ }
+
+ @Override
+ public final IndexDesc getIndexByColumns(final String databaseName,
+ final String tableName,
+ final Column [] columns) {
+ return getIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
+ }
+
@Override
- public final IndexDesc getIndexByColumn(final String databaseName,
- final String tableName,
- final String columnName) {
+ public final IndexDesc getIndexByColumnNames(final String databaseName,
+ final String tableName,
+ final String [] columnNames) {
try {
- return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public IndexDesc call(NettyClientBase client) throws ServiceException {
-
- GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- for (String columnName : columnNames) {
- builder.addColumnNames(columnName);
- }
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++// builder.setColumnName(columnName);
++//
++//<<<<<<< HEAD
++ GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
++ for (String columnName : columnNames) {
++ builder.addColumnNames(columnName);
++ }
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
- return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++ return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public final Collection<IndexDesc> getAllIndexesByTable(final String databaseName,
+ final String tableName) {
+ try {
- return new ServerCallable<Collection<IndexDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- @Override
- public Collection<IndexDesc> call(NettyClientBase client) throws Exception {
- TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
- List<IndexDesc> indexDescs = TUtil.newList();
- for (IndexDescProto descProto : response.getIndexDescList()) {
- indexDescs.add(new IndexDesc(descProto));
- }
- return indexDescs;
- }
- }.withRetries();
++ TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
++ CatalogProtocolService.BlockingInterface stub = getStub();
++ GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
++ List<IndexDesc> indexDescs = TUtil.newList();
++ for (IndexDescProto descProto : response.getIndexDescList()) {
++ indexDescs.add(new IndexDesc(descProto));
++ }
++ return indexDescs;
++//=======
++// CatalogProtocolService.BlockingInterface stub = getStub();
++// return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@@ -781,6 -520,18 +604,21 @@@
return false;
}
}
-
- @Override
- public List<IndexProto> getAllIndexes() {
- try {
- CatalogProtocolService.BlockingInterface stub = getStub();
- GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
- return response.getIndexList();
- } catch (ServiceException e) {
- LOG.error(e.getMessage(), e);
- return new ArrayList<IndexProto>();
- }
- }
++//<<<<<<< HEAD
++//=======
++//
++// @Override
++// public List<IndexProto> getAllIndexes() {
++// try {
++// CatalogProtocolService.BlockingInterface stub = getStub();
++// GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++// return response.getIndexList();
++// } catch (ServiceException e) {
++// LOG.error(e.getMessage(), e);
++// return new ArrayList<IndexProto>();
++// }
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
@Override
public final boolean createFunction(final FunctionDesc funcDesc) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 5fa1c67,9397fcf..5a04892
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@@ -20,16 -20,15 +20,15 @@@ package org.apache.tajo.client
import com.google.protobuf.ServiceException;
import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.ServerCallable;
import java.io.IOException;
import java.net.URI;
@@@ -132,32 -97,25 +97,57 @@@ public class CatalogAdminClientImpl imp
final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
throws SQLException, ServiceException {
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setName(tableName);
- builder.setSchema(schema.getProto());
- builder.setMeta(meta.getProto());
- builder.setPath(path.toString());
- if (partitionMethodDesc != null) {
- builder.setPartition(partitionMethodDesc.getProto());
- }
- ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
- if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setName(tableName);
+ builder.setSchema(schema.getProto());
+ builder.setMeta(meta.getProto());
+ builder.setPath(path.toString());
+ if (partitionMethodDesc != null) {
+ builder.setPartition(partitionMethodDesc.getProto());
+ }
+ ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++ throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+ }
++
++//<<<<<<< HEAD
++// return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++// }
++//
++// }.withRetries();
++//=======
++// NettyClientBase client = connection.getTajoMasterConnection();
++// connection.checkSessionAndGet(client);
++// BlockingInterface tajoMasterService = client.getStub();
++//
++// ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setName(tableName);
++// builder.setSchema(schema.getProto());
++// builder.setMeta(meta.getProto());
++// builder.setPath(path.toString());
++// if (partitionMethodDesc != null) {
++// builder.setPartition(partitionMethodDesc.getProto());
++// }
++// ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
++// if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++// return CatalogUtil.newTableDesc(res.getTableDesc());
++// } else {
++// throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
@Override
@@@ -211,169 -156,37 +188,222 @@@
@Override
public TableDesc getTableDesc(final String tableName) throws ServiceException {
-
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
- if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
--
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setTableName(tableName);
+ ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
- throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++ throw new ServiceException(new SQLException(res.getResult().getErrorMessage(),
++ SQLStates.ER_NO_SUCH_TABLE.getState()));
+ }
++
++//<<<<<<< HEAD
++// return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++// }
++//
++// }.withRetries();
++//=======
++// NettyClientBase client = connection.getTajoMasterConnection();
++// connection.checkSessionAndGet(client);
++// BlockingInterface tajoMasterService = client.getStub();
++//
++// ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setTableName(tableName);
++// ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
++// if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++// return CatalogUtil.newTableDesc(res.getTableDesc());
++// } else {
++// throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
@Override
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
-
- return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- String paramFunctionName = functionName == null ? "" : functionName;
- ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
- connection.convertSessionedString(paramFunctionName));
- if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return res.getFunctionsList();
- } else {
- throw new SQLException(res.getResult().getErrorMessage());
- }
- }
--
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ String paramFunctionName = functionName == null ? "" : functionName;
+ ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+ connection.convertSessionedString(paramFunctionName));
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return res.getFunctionsList();
+ } else {
- throw new ServiceException(new SQLException(res.getErrorMessage()));
++ throw new ServiceException(res.getResult().getErrorMessage());
++ }
++
++//<<<<<<< HEAD
++// return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++// }
++//
++// }.withRetries();
++//=======
++// NettyClientBase client = connection.getTajoMasterConnection();
++// connection.checkSessionAndGet(client);
++// BlockingInterface tajoMasterService = client.getStub();
++//
++// String paramFunctionName = functionName == null ? "" : functionName;
++// ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
++// connection.convertSessionedString(paramFunctionName));
++// if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++// return res.getFunctionsList();
++// } else {
++// throw new ServiceException(new SQLException(res.getErrorMessage()));
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
+ }
+
+ @Override
+ public IndexDescProto getIndex(final String indexName) throws ServiceException {
- return new ServerCallable<IndexDescProto>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public IndexDescProto call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getIndexWithName(null,
- connection.convertSessionedString(indexName));
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.getIndexWithName(null,
++ connection.convertSessionedString(indexName));
+ }
+
+ @Override
+ public boolean existIndex(final String indexName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existIndexWithName(null,
- connection.convertSessionedString(indexName)).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.existIndexWithName(null,
++ connection.convertSessionedString(indexName)).getValue();
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException {
- return new ServerCallable<List<IndexDescProto>>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public List<IndexDescProto> call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
- connection.convertSessionedString(tableName));
- if (response.getResult().getResultCode() == ResultCode.OK) {
- return response.getIndexesList();
- } else {
- throw new SQLException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
++ connection.convertSessionedString(tableName));
++ if (response.getResult().getResultCode() == ResultCode.OK) {
++ return response.getIndexesList();
++ } else {
++ throw new ServiceException(response.getResult().getErrorMessage());
++ }
++// return new ServerCallable<List<IndexDescProto>>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public List<IndexDescProto> call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public boolean hasIndexes(final String tableName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existIndexesForTable(null,
- connection.convertSessionedString(tableName)).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.existIndexesForTable(null,
++ connection.convertSessionedString(tableName)).getValue();
++
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException {
- return new ServerCallable<IndexDescProto>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public IndexDescProto call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- for (String eachColumnName : columnNames) {
- builder.addColumnNames(eachColumnName);
- }
- GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
- if (response.getResult().getResultCode() == ResultCode.OK) {
- return response.getIndexDesc();
- } else {
- throw new SQLException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++ builder.setSessionId(connection.sessionId);
++ builder.setTableName(tableName);
++ for (String eachColumnName : columnNames) {
++ builder.addColumnNames(eachColumnName);
++ }
++ GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
++ if (response.getResult().getResultCode() == ResultCode.OK) {
++ return response.getIndexDesc();
++ } else {
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
++
++// return new ServerCallable<IndexDescProto>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public IndexDescProto call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- for (String eachColumnName : columnName) {
- builder.addColumnNames(eachColumnName);
- }
- return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++ builder.setSessionId(connection.sessionId);
++ builder.setTableName(tableName);
++ for (String eachColumnName : columnName) {
++ builder.addColumnNames(eachColumnName);
++ }
++ return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
++
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public boolean dropIndex(final String indexName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.dropIndex(null,
- connection.convertSessionedString(indexName)).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.dropIndex(null,
++ connection.convertSessionedString(indexName)).getValue();
++
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 73abc4c,53889fe..007c010
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@@ -153,28 -153,21 +153,40 @@@ public class QueryClientImpl implement
@Override
public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
-
- return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
-
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
- SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
- if (response.getResult().getResultCode() == ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- }
- return response;
- }
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+
+ final QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
+ SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
- if (response.getResultCode() == ResultCode.OK) {
++ if (response.getResult().getResultCode() == ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ }
+ return response;
++
++//<<<<<<< HEAD
++// connection.checkSessionAndGet(client);
++//
++// final QueryRequest.Builder builder = QueryRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQuery(sql);
++// builder.setIsJson(false);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//
++//
++// }
++// }.withRetries();
++//=======
++// SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
++// if (response.getResultCode() == ResultCode.OK) {
++// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// }
++// return response;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
@Override
@@@ -369,42 -356,26 +375,60 @@@
throws ServiceException {
try {
- final ServerCallable<ClientProtos.SerializedResultSet> callable =
- new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
- builder.setFetchRowNum(fetchRowNum);
- try {
- GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
- if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
- abort();
- throw new ServiceException(response.getResult().getErrorMessage());
- }
-
- return response.getResultSet();
- } catch (ServiceException e) {
- abort();
- throw e;
- } catch (Throwable t) {
- throw new ServiceException(t.getMessage(), t);
- }
- }
- };
-
- ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//<<<<<<< HEAD
++// final ServerCallable<ClientProtos.SerializedResultSet> callable =
++// new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
++//
++// connection.checkSessionAndGet(client);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++// GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQueryId(queryId.getProto());
++// builder.setFetchRowNum(fetchRowNum);
++// try {
++// GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++// abort();
++// throw new ServiceException(response.getResult().getErrorMessage());
++// }
++//
++// return response.getResultSet();
++// } catch (ServiceException e) {
++// abort();
++// throw e;
++// } catch (Throwable t) {
++// throw new ServiceException(t.getMessage(), t);
++// }
++// }
++// };
++//
++// ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//=======
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+ builder.setFetchRowNum(fetchRowNum);
+
+ GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
- throw new ServiceException(response.getErrorMessage());
++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
+
+ ClientProtos.SerializedResultSet resultSet = response.getResultSet();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
return new TajoMemoryResultSet(queryId,
- new Schema(serializedResultSet.getSchema()),
- serializedResultSet.getSerializedTuplesList(),
- serializedResultSet.getSerializedTuplesCount(),
+ new Schema(resultSet.getSchema()),
+ resultSet.getSerializedTuplesList(),
+ resultSet.getSerializedTuplesCount(),
getClientSideSessionVars());
} catch (ServiceException e) {
throw e;
@@@ -416,59 -387,47 +440,92 @@@
@Override
public boolean updateQuery(final String sql) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-
- if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return true;
- } else {
- if (response.getResult().hasErrorMessage()) {
- System.err.println("ERROR: " + response.getResult().getErrorMessage());
- }
- return false;
- }
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++//<<<<<<< HEAD
++// connection.checkSessionAndGet(client);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++// QueryRequest.Builder builder = QueryRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQuery(sql);
++// builder.setIsJson(false);
++// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++//
++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// return true;
++// } else {
++// if (response.getResult().hasErrorMessage()) {
++// System.err.println("ERROR: " + response.getResult().getErrorMessage());
++// }
++// return false;
++// }
++//=======
++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return true;
+ } else {
- if (response.hasErrorMessage()) {
- LOG.error("ERROR: " + response.getErrorMessage());
++ if (response.getResult().hasErrorMessage()) {
++ LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ return false;
+ }
}
@Override
public boolean updateQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return true;
- } else {
- if (response.getResult().hasErrorMessage()) {
- System.err.println("ERROR: " + response.getResult().getErrorMessage());
- }
- return false;
- }
++//<<<<<<< HEAD
++// return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public Boolean call(NettyClientBase client) throws ServiceException {
++//
++// connection.checkSessionAndGet(client);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++// QueryRequest.Builder builder = QueryRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQuery(json);
++// builder.setIsJson(true);
++// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++// return true;
++// } else {
++// if (response.getResult().hasErrorMessage()) {
++// System.err.println("ERROR: " + response.getResult().getErrorMessage());
++// }
++// return false;
++// }
++//=======
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(json);
+ builder.setIsJson(true);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return true;
+ } else {
- if (response.hasErrorMessage()) {
- LOG.error("ERROR: " + response.getErrorMessage());
++ if (response.getResult().hasErrorMessage()) {
++ LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ return false;
+ }
}
@Override
@@@ -581,25 -519,20 +617,42 @@@
}
public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
- return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
- public QueryInfoProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
-
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
- if (res.getResult().getResultCode() == ResultCode.OK) {
- return res.getQueryInfo();
- } else {
- abort();
- throw new ServiceException(res.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++//<<<<<<< HEAD
++// return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++// public QueryInfoProto call(NettyClientBase client) throws ServiceException {
++// connection.checkSessionAndGet(client);
++//
++// QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQueryId(queryId.getProto());
++//
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++// GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
++// if (res.getResult().getResultCode() == ResultCode.OK) {
++// return res.getQueryInfo();
++// } else {
++// abort();
++// throw new ServiceException(res.getResult().getErrorMessage());
++// }
++// }
++// }.withRetries();
++//=======
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
- if (res.getResultCode() == ResultCode.OK) {
++ if (res.getResult().getResultCode() == ResultCode.OK) {
+ return res.getQueryInfo();
+ } else {
- throw new ServiceException(res.getErrorMessage());
++ throw new ServiceException(res.getResult().getErrorMessage());
+ }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@@ -611,24 -544,31 +664,42 @@@
InetSocketAddress qmAddress = new InetSocketAddress(
queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
- return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
- QueryMasterClientProtocol.class, false) {
- public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
+ RpcClientManager manager = RpcClientManager.getInstance();
+ NettyClientBase queryMasterClient;
+ try {
+ queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+ manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
+ try {
+ connection.checkSessionAndGet(connection.getTajoMasterConnection());
+
++//<<<<<<< HEAD
++// QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
++// GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
++// if (res.getResult().getResultCode() == ResultCode.OK) {
++// return res.getQueryHistory();
++// } else {
++// abort();
++// throw new ServiceException(res.getResult().getErrorMessage());
++// }
++//=======
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
- GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
- if (res.getResult().getResultCode() == ResultCode.OK) {
- return res.getQueryHistory();
- } else {
- abort();
- throw new ServiceException(res.getResult().getErrorMessage());
- }
+ QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+ GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
- if (res.getResultCode() == ResultCode.OK) {
++ if (res.getResult().getResultCode() == ResultCode.OK) {
+ return res.getQueryHistory();
+ } else {
- throw new ServiceException(res.getErrorMessage());
++ throw new ServiceException(res.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ } finally {
+ queryMasterClient.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index bb15981,84decd5..1bb0e16
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@@ -157,52 -160,43 +160,81 @@@ public class SessionConnection implemen
}
public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException {
- return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- KeyValueSet keyValueSet = new KeyValueSet();
- keyValueSet.putAll(variables);
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .setSessionVars(keyValueSet.getProto()).build();
-
- SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
-
- if (response.getResult().getResultCode() == ResultCode.OK) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw new ServiceException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++//<<<<<<< HEAD
++// return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public Map<String, String> call(NettyClientBase client) throws ServiceException {
++// checkSessionAndGet(client);
++//
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++// KeyValueSet keyValueSet = new KeyValueSet();
++// keyValueSet.putAll(variables);
++// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++// .setSessionId(sessionId)
++// .setSessionVars(keyValueSet.getProto()).build();
++//
++// SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
++//
++// if (response.getResult().getResultCode() == ResultCode.OK) {
++// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// return Collections.unmodifiableMap(sessionVarsCache);
++// } else {
++// throw new ServiceException(response.getResult().getErrorMessage());
++// }
++// }
++// }.withRetries();
++// }
++//=======
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
+
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.putAll(variables);
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .setSessionVars(keyValueSet.getProto()).build();
+
+ SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+
- if (response.getResultCode() == ResultCode.OK) {
++ if (response.getResult().getResultCode() == ResultCode.OK) {
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
+ } else {
- throw new ServiceException(response.getMessage());
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
}
- public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
- return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .addAllUnsetVariables(variables).build();
-
- SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
-
- if (response.getResult().getResultCode() == ResultCode.OK) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw new ServiceException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
+ public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
+
++//<<<<<<< HEAD
++// if (response.getResult().getResultCode() == ResultCode.OK) {
++// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// return Collections.unmodifiableMap(sessionVarsCache);
++// } else {
++// throw new ServiceException(response.getResult().getErrorMessage());
++// }
++// }
++// }.withRetries();
++//=======
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .addAllUnsetVariables(variables).build();
+
+ SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+
- if (response.getResultCode() == ResultCode.OK) {
++ if (response.getResult().getResultCode() == ResultCode.OK) {
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
+ } else {
- throw new ServiceException(response.getMessage());
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
void updateSessionVarsCache(Map<String, String> variables) {
@@@ -333,55 -308,51 +346,81 @@@
}
public boolean reconnect() throws Exception {
- return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
- builder.setUsername(userInfo.getUserName()).build();
- if (baseDatabase != null) {
- builder.setBaseDatabaseName(baseDatabase);
- }
-
+ CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+ builder.setUsername(userInfo.getUserName()).build();
+ if (baseDatabase != null) {
+ builder.setBaseDatabaseName(baseDatabase);
+ }
- // create new session
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
- if (response.getResult().getResultCode() != ResultCode.OK) {
- return false;
- }
+ NettyClientBase client = getTajoMasterConnection();
+
++//<<<<<<< HEAD
++// // create new session
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++// CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
++// if (response.getResult().getResultCode() != ResultCode.OK) {
++// return false;
++// }
++//=======
+ // create new session
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
- if (response.getResultCode() != ResultCode.OK) {
++ if (response.getResult().getResultCode() != ResultCode.OK) {
+ return false;
+ }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
- // Invalidate some session variables in client cache
- sessionId = response.getSessionId();
- Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
- synchronized (sessionVarsCache) {
- for (SessionVars var : UPDATE_ON_RECONNECT) {
- String value = sessionVars.get(var.keyname());
- if (value != null) {
- sessionVarsCache.put(var.keyname(), value);
- }
- }
+ // Invalidate some session variables in client cache
+ sessionId = response.getSessionId();
+ Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+ synchronized (sessionVarsCache) {
+ for (SessionVars var : UPDATE_ON_RECONNECT) {
+ String value = sessionVars.get(var.keyname());
+ if (value != null) {
+ sessionVarsCache.put(var.keyname(), value);
}
+ }
+ }
- // Update the session variables in server side
- try {
- KeyValueSet keyValueSet = new KeyValueSet();
- keyValueSet.putAll(sessionVarsCache);
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .setSessionVars(keyValueSet.getProto()).build();
-
- if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
- tajoMasterService.removeSession(null, sessionId);
- return false;
- }
- LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
- return true;
- } catch (ServiceException e) {
- tajoMasterService.removeSession(null, sessionId);
- return false;
- }
++//<<<<<<< HEAD
++// // Update the session variables in server side
++// try {
++// KeyValueSet keyValueSet = new KeyValueSet();
++// keyValueSet.putAll(sessionVarsCache);
++// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++// .setSessionId(sessionId)
++// .setSessionVars(keyValueSet.getProto()).build();
++//
++// if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
++// tajoMasterService.removeSession(null, sessionId);
++// return false;
++// }
++// LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
++// return true;
++// } catch (ServiceException e) {
++// tajoMasterService.removeSession(null, sessionId);
++// return false;
++// }
++//=======
+ // Update the session variables in server side
+ try {
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.putAll(sessionVarsCache);
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .setSessionVars(keyValueSet.getProto()).build();
+
- if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
++ if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+ return true;
+ } catch (ServiceException e) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index be1cdf2,b1a27fa..7ad658f
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -157,24 -153,55 +153,55 @@@ public class TestKillQuery
@Test
public final void testIgnoreStageStateFromKilled() throws Exception {
- ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
- QueryId queryId = new QueryId(res.getQueryId());
- cluster.waitForQuerySubmitted(queryId);
+ SQLAnalyzer analyzer = new SQLAnalyzer();
+ QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+ Session session = LocalTajoTestingUtility.createDummySession();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+
+ LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
++ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+ Expr expr = analyzer.parse(queryStr);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+ optimizer.optimize(plan);
+
+ QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+ QueryContext queryContext = new QueryContext(conf);
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+ GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+ globalPlanner.build(masterPlan);
- QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
- Query query = qmt.getQuery();
+ CountDownLatch barrier = new CountDownLatch(1);
+ MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+
+ QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+ QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+ queryId, session, defaultContext, expr.toJson(), dispatch);
- // wait for a stage created
- cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+ queryMasterTask.init(conf);
+ queryMasterTask.getQueryTaskContext().getDispatcher().start();
+ queryMasterTask.startQuery();
try{
- cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
- } finally {
- assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState());
+ barrier.await(5000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+ }
+
+ Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+ assertNotNull(stage);
+
+ // fire kill event
+ queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+ try {
+ cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+ } finally {
+ queryMasterTask.stop();
}
- List<Stage> stages = Lists.newArrayList(query.getStages());
+ List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
Stage lastStage = stages.get(stages.size() - 1);
assertEquals(StageState.KILLED, lastStage.getSynchronizedState());
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------