You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/16 13:59:40 UTC
[1/2] TAJO-257: Unit tests occassionally fail. (hyunsik)
Updated Branches:
refs/heads/master 202aa29b7 -> fd60ec395
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java
deleted file mode 100644
index 26541a4..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Invocation;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Response;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-@Deprecated
-public class NettyRpcServer extends NettyServerBase {
- private static Log LOG = LogFactory.getLog(NettyRpcServer.class);
- private final Object instance;
- private final Class<?> clazz;
- private final ChannelPipelineFactory pipeline;
- private Map<String, Method> methods;
- private Map<String, Method> builderMethods;
-
- @Deprecated
- public NettyRpcServer(Object proxy, InetSocketAddress bindAddress) {
- super(bindAddress);
- this.instance = proxy;
- this.clazz = instance.getClass();
- this.methods = new HashMap<String, Method>();
- this.builderMethods = new HashMap<String, Method>();
- this.pipeline =
- new ProtoPipelineFactory(new ServerHandler(),
- Invocation.getDefaultInstance());
-
- super.init(this.pipeline);
- for (Method m : this.clazz.getDeclaredMethods()) {
- String methodName = m.getName();
- Class<?> params[] = m.getParameterTypes();
- try {
- methods.put(methodName, m);
-
- Method mtd = params[0].getMethod("newBuilder", new Class[] {});
- builderMethods.put(methodName, mtd);
- } catch (Exception e) {
- e.printStackTrace();
- continue;
- }
- }
- }
-
- public NettyRpcServer(Object proxy, Class<?> interfaceClass,
- InetSocketAddress bindAddress) {
- super(bindAddress);
- this.instance = proxy;
- this.clazz = instance.getClass();
- this.methods = new HashMap<String, Method>();
- this.builderMethods = new HashMap<String, Method>();
- this.pipeline =
- new ProtoPipelineFactory(new ServerHandler(),
- Invocation.getDefaultInstance());
-
- super.init(this.pipeline);
- for (Method m : interfaceClass.getDeclaredMethods()) {
- String methodName = m.getName();
- Class<?> params[] = m.getParameterTypes();
- try {
- methods.put(methodName, this.clazz.getMethod(methodName, params));
-
- Method mtd = params[0].getMethod("newBuilder", new Class[] {});
- builderMethods.put(methodName, mtd);
- } catch (Exception e) {
- e.printStackTrace();
- continue;
- }
- }
- }
-
- private class ServerHandler extends SimpleChannelUpstreamHandler {
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- final Invocation request = (Invocation) e.getMessage();
- Object res = null;
- Response response = null;
-
- try {
- String methodName = request.getMethodName();
- if (methods.containsKey(methodName) == false) {
- throw new NoSuchMethodException(methodName);
- }
-
- Method method = methods.get(methodName);
- Method builderGenMethod = builderMethods.get(methodName);
- Builder builder =
- (Builder) builderGenMethod.invoke(null, new Object[] {});
- Message msg = builder.mergeFrom(request.getParam(0)).build();
- res = method.invoke(instance, msg);
-
- } catch (InvocationTargetException internalException) {
- LOG.error(ExceptionUtils.getStackTrace(internalException
- .getTargetException()));
- response =
- Response
- .newBuilder()
- .setId(request.getId())
- .setHasReturn(false)
- .setExceptionMessage(
- internalException.getTargetException().toString()).build();
- e.getChannel().write(response);
- return;
- } catch (Exception otherException) {
- otherException.printStackTrace();
- response =
- Response.newBuilder().setId(request.getId()).setHasReturn(false)
- .setExceptionMessage(otherException.toString()).build();
- e.getChannel().write(response);
- return;
- }
-
- if (res == null) {
- response =
- Response.newBuilder().setId(request.getId()).setHasReturn(false)
- .build();
- } else {
- ByteString str = ((Message) res).toByteString();
- response =
- Response.newBuilder().setId(request.getId()).setHasReturn(true)
- .setReturnValue(str).build();
- }
- e.getChannel().write(response);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- e.getChannel().close();
- LOG.error(e.getCause());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
deleted file mode 100644
index d78e4e1..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
-import org.jboss.netty.channel.*;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ProtoAsyncRpcClient extends NettyClientBase {
- private static final Log LOG = LogFactory.getLog(RpcProtos.class);
-
- private final ChannelUpstreamHandler handler;
- private final ChannelPipelineFactory pipeFactory;
- private final ProxyRpcChannel rpcChannel;
-
- private final AtomicInteger sequence = new AtomicInteger(0);
- private final Map<Integer, ResponseCallback> requests =
- new ConcurrentHashMap<Integer, ResponseCallback>();
-
- private final Class<?> protocol;
- private final Method stubMethod;
-
- public ProtoAsyncRpcClient(final Class<?> protocol,
- final InetSocketAddress addr)
- throws Exception {
-
- this.protocol = protocol;
- String serviceClassName = protocol.getName() + "$"
- + protocol.getSimpleName() + "Service";
- Class<?> serviceClass = Class.forName(serviceClassName);
- stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
-
- this.handler = new ClientChannelUpstreamHandler();
- pipeFactory = new ProtoPipelineFactory(handler,
- RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory);
- rpcChannel = new ProxyRpcChannel(getChannel());
- }
-
- public <T> T getStub() throws Exception {
- return (T) stubMethod.invoke(null, rpcChannel);
- }
-
- public RpcChannel getRpcChannel() {
- return this.rpcChannel;
- }
-
- private class ProxyRpcChannel implements RpcChannel {
- private final Channel channel;
- private final ClientChannelUpstreamHandler handler;
-
- public ProxyRpcChannel(Channel channel) {
- this.channel = channel;
- this.handler = channel.getPipeline()
- .get(ClientChannelUpstreamHandler.class);
-
- if (handler == null) {
- throw new IllegalArgumentException("Channel does not have " +
- "proper handler");
- }
- }
-
- public void callMethod(final MethodDescriptor method,
- final RpcController controller,
- final Message param,
- final Message responseType,
- RpcCallback<Message> done) {
-
- int nextSeqId = sequence.getAndIncrement();
-
- Message rpcRequest = buildRequest(nextSeqId, method, param);
-
- handler.registerCallback(nextSeqId,
- new ResponseCallback(controller, responseType, done));
- channel.write(rpcRequest);
- }
-
- private Message buildRequest(int seqId,
- MethodDescriptor method,
- Message param) {
-
- RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
- .setId(seqId)
- .setMethodName(method.getName());
-
- if (param != null) {
- requestBuilder.setRequestMessage(param.toByteString());
- }
-
- return requestBuilder.build();
- }
- }
-
- private class ResponseCallback implements RpcCallback<RpcResponse> {
- private final RpcController controller;
- private final Message responsePrototype;
- private final RpcCallback<Message> callback;
-
- public ResponseCallback(RpcController controller,
- Message responsePrototype,
- RpcCallback<Message> callback) {
- this.controller = controller;
- this.responsePrototype = responsePrototype;
- this.callback = callback;
- }
-
- public void run(RpcResponse rpcResponse) {
-
- // if hasErrorMessage is true, it means rpc-level errors.
- // it does not call the callback function
- if (rpcResponse.hasErrorMessage()) {
-
- if (controller != null) {
- this.controller.setFailed(rpcResponse.getErrorMessage());
- }
- callback.run(null);
- throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
-
- } else { // if rpc call succeed
-
- try {
- Message responseMessage;
- if (!rpcResponse.hasResponseMessage()) {
- responseMessage = null;
- } else {
- responseMessage = responsePrototype.newBuilderForType().mergeFrom(
- rpcResponse.getResponseMessage()).build();
- }
-
- callback.run(responseMessage);
-
- } catch (InvalidProtocolBufferException e) {
- throw new RemoteException(getErrorMessage(""), e);
- }
- }
- }
- }
-
- private String getErrorMessage(String message) {
- return "Exception [" + protocol.getCanonicalName() +
- "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
- getChannel().getRemoteAddress()) + ")]: " + message;
- }
-
- private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
-
- synchronized void registerCallback(int seqId, ResponseCallback callback) {
-
- if (requests.containsKey(seqId)) {
- throw new RemoteException(
- getErrorMessage("Duplicate Sequence Id "+ seqId));
- }
-
- requests.put(seqId, callback);
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
-
- RpcResponse response = (RpcResponse) e.getMessage();
- ResponseCallback callback = requests.remove(response.getId());
-
- if (callback == null) {
- LOG.warn("Dangling rpc call");
- } else {
- callback.run(response);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- LOG.error(addr + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
- e.getChannel().close();
- throw new RemoteException(getErrorMessage(addr.toString()), e.getCause());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
deleted file mode 100644
index 1a6ce29..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-public class ProtoAsyncRpcServer extends NettyServerBase {
- private static final Log LOG = LogFactory.getLog(ProtoAsyncRpcServer.class);
-
- private final Service service;
- private final ChannelPipelineFactory pipeline;
-
- public ProtoAsyncRpcServer(final Class<?> protocol,
- final Object instance,
- final InetSocketAddress bindAddress)
- throws Exception {
- super(protocol.getSimpleName(), bindAddress);
-
- String serviceClassName = protocol.getName() + "$" +
- protocol.getSimpleName() + "Service";
- Class<?> serviceClass = Class.forName(serviceClassName);
- Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
- Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
- this.service = (Service) method.invoke(null, instance);
-
- ServerHandler handler = new ServerHandler();
- this.pipeline = new ProtoPipelineFactory(handler,
- RpcRequest.getDefaultInstance());
- super.init(this.pipeline);
- }
-
- private class ServerHandler extends SimpleChannelUpstreamHandler {
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
-
- final RpcRequest request = (RpcRequest) e.getMessage();
-
- String methodName = request.getMethodName();
- MethodDescriptor methodDescriptor = service.getDescriptorForType().
- findMethodByName(methodName);
-
- if (methodDescriptor == null) {
- throw new RemoteCallException(request.getId(),
- new NoSuchMethodException(methodName));
- }
-
- Message paramProto = null;
- if (request.hasRequestMessage()) {
- try {
- paramProto = service.getRequestPrototype(methodDescriptor)
- .newBuilderForType().mergeFrom(request.getRequestMessage()).
- build();
- } catch (Throwable t) {
- throw new RemoteCallException(request.getId(), methodDescriptor, t);
- }
- }
-
- final Channel channel = e.getChannel();
- final RpcController controller = new NettyRpcController();
-
- RpcCallback<Message> callback =
- !request.hasId() ? null : new RpcCallback<Message>() {
-
- public void run(Message returnValue) {
-
- RpcResponse.Builder builder = RpcResponse.newBuilder()
- .setId(request.getId());
-
- if (returnValue != null) {
- builder.setResponseMessage(returnValue.toByteString());
- }
-
- if (controller.failed()) {
- builder.setErrorMessage(controller.errorText());
- }
-
- channel.write(builder.build());
- }
- };
-
- service.callMethod(methodDescriptor, controller, paramProto, callback);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception{
- if (e.getCause() instanceof RemoteCallException) {
- RemoteCallException callException = (RemoteCallException) e.getCause();
- e.getChannel().write(callException.getResponse());
- }
- throw new RemoteException(e.getCause());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
deleted file mode 100644
index 2a0b01f..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
-import org.jboss.netty.channel.*;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ProtoBlockingRpcClient extends NettyClientBase {
- private static final Log LOG = LogFactory.getLog(RpcProtos.class);
-
- private final ChannelUpstreamHandler handler;
- private final ChannelPipelineFactory pipeFactory;
- private final ProxyRpcChannel rpcChannel;
-
- private final AtomicInteger sequence = new AtomicInteger(0);
- private final Map<Integer, ProtoCallFuture> requests =
- new ConcurrentHashMap<Integer, ProtoCallFuture>();
-
- private final Class<?> protocol;
- private final Method stubMethod;
-
- public ProtoBlockingRpcClient(final Class<?> protocol,
- final InetSocketAddress addr)
- throws Exception {
-
- this.protocol = protocol;
- String serviceClassName = protocol.getName() + "$"
- + protocol.getSimpleName() + "Service";
- Class<?> serviceClass = Class.forName(serviceClassName);
- stubMethod = serviceClass.getMethod("newBlockingStub",
- BlockingRpcChannel.class);
-
- this.handler = new ClientChannelUpstreamHandler();
- pipeFactory = new ProtoPipelineFactory(handler,
- RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory);
- rpcChannel = new ProxyRpcChannel(getChannel());
- }
-
- public <T> T getStub() throws Exception {
- return (T) stubMethod.invoke(null, rpcChannel);
- }
-
- public BlockingRpcChannel getBlockingRpcChannel() {
- return this.rpcChannel;
- }
-
- private class ProxyRpcChannel implements BlockingRpcChannel {
- private final Channel channel;
- private final ClientChannelUpstreamHandler handler;
-
- public ProxyRpcChannel(Channel channel) {
- this.channel = channel;
- this.handler = channel.getPipeline().
- get(ClientChannelUpstreamHandler.class);
-
- if (handler == null) {
- throw new IllegalArgumentException("Channel does not have " +
- "proper handler");
- }
- }
-
- public Message callBlockingMethod(final MethodDescriptor method,
- final RpcController controller,
- final Message param,
- final Message responsePrototype)
- throws ServiceException {
-
- int nextSeqId = sequence.getAndIncrement();
-
- Message rpcRequest = buildRequest(nextSeqId, method, param);
-
- ProtoCallFuture callFuture =
- new ProtoCallFuture(controller, responsePrototype);
- requests.put(nextSeqId, callFuture);
- channel.write(rpcRequest);
-
- try {
- return callFuture.get();
- } catch (Throwable t) {
- throw new RemoteException(t);
- }
- }
-
- private Message buildRequest(int seqId,
- MethodDescriptor method,
- Message param) {
- RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
- .setId(seqId)
- .setMethodName(method.getName());
-
- if (param != null) {
- requestBuilder.setRequestMessage(param.toByteString());
- }
-
- return requestBuilder.build();
- }
- }
-
- private String getErrorMessage(String message) {
- if(protocol != null && getChannel() != null) {
- return "Exception [" + protocol.getCanonicalName() +
- "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
- getChannel().getRemoteAddress()) + ")]: " + message;
- } else {
- return "Exception " + message;
- }
- }
-
- private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
-
- RpcResponse rpcResponse = (RpcResponse) e.getMessage();
- ProtoCallFuture callback = requests.remove(rpcResponse.getId());
-
- if (callback == null) {
- LOG.warn("Dangling rpc call");
- } else {
- if (rpcResponse.hasErrorMessage()) {
- if (callback.controller != null) {
- callback.setFailed(rpcResponse.getErrorMessage());
- }
- throw new RemoteException(
- getErrorMessage(rpcResponse.getErrorMessage()));
- } else {
- Message responseMessage;
-
- if (!rpcResponse.hasResponseMessage()) {
- responseMessage = null;
- } else {
- responseMessage =
- callback.returnType.newBuilderForType().
- mergeFrom(rpcResponse.getResponseMessage()).build();
- }
-
- callback.setResponse(responseMessage);
- }
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- e.getChannel().close();
- throw new RemoteException(getErrorMessage(""), e.getCause());
- }
- }
-
- class ProtoCallFuture implements Future<Message> {
- private Semaphore sem = new Semaphore(0);
- private Message response = null;
- private Message returnType;
-
- private RpcController controller;
-
- public ProtoCallFuture(RpcController controller, Message message) {
- this.controller = controller;
- this.returnType = message;
- }
-
- @Override
- public boolean cancel(boolean arg0) {
- return false;
- }
-
- @Override
- public Message get() throws InterruptedException, ExecutionException {
- sem.acquire();
- return response;
- }
-
- @Override
- public Message get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- if(sem.tryAcquire(timeout, unit)) {
- return response;
- } else {
- throw new TimeoutException();
- }
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return sem.availablePermits() > 0;
- }
-
- public void setResponse(Message response) {
- this.response = response;
- sem.release();
- }
-
- public void setFailed(String errorText) {
- this.controller.setFailed(errorText);
- sem.release();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
deleted file mode 100644
index cc47b7b..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-public class ProtoBlockingRpcServer extends NettyServerBase {
- private static Log LOG = LogFactory.getLog(ProtoBlockingRpcServer.class);
- private final BlockingService service;
- private final ChannelPipelineFactory pipeline;
-
- public ProtoBlockingRpcServer(final Class<?> protocol,
- final Object instance,
- final InetSocketAddress bindAddress)
- throws Exception {
-
- super(protocol.getSimpleName(), bindAddress);
-
- String serviceClassName = protocol.getName() + "$" +
- protocol.getSimpleName() + "Service";
- Class<?> serviceClass = Class.forName(serviceClassName);
- Class<?> interfaceClass = Class.forName(serviceClassName +
- "$BlockingInterface");
- Method method = serviceClass.getMethod(
- "newReflectiveBlockingService", interfaceClass);
-
- this.service = (BlockingService) method.invoke(null, instance);
- this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
- RpcRequest.getDefaultInstance());
-
- super.init(this.pipeline);
- }
-
- private class ServerHandler extends SimpleChannelUpstreamHandler {
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- final RpcRequest request = (RpcRequest) e.getMessage();
-
- String methodName = request.getMethodName();
- MethodDescriptor methodDescriptor =
- service.getDescriptorForType().findMethodByName(methodName);
-
- if (methodDescriptor == null) {
- throw new RemoteCallException(request.getId(),
- new NoSuchMethodException(methodName));
- }
- Message paramProto = null;
- if (request.hasRequestMessage()) {
- try {
- paramProto = service.getRequestPrototype(methodDescriptor)
- .newBuilderForType().mergeFrom(request.getRequestMessage()).
- build();
-
- } catch (Throwable t) {
- throw new RemoteCallException(request.getId(), methodDescriptor, t);
- }
- }
- Message returnValue;
- RpcController controller = new NettyRpcController();
-
- try {
- returnValue = service.callBlockingMethod(methodDescriptor,
- controller, paramProto);
- } catch (Throwable t) {
- throw new RemoteCallException(request.getId(), methodDescriptor, t);
- }
-
- RpcResponse.Builder builder =
- RpcResponse.newBuilder().setId(request.getId());
-
- if (returnValue != null) {
- builder.setResponseMessage(returnValue.toByteString());
- }
-
- if (controller.failed()) {
- builder.setErrorMessage(controller.errorText());
- }
- e.getChannel().write(builder.build());
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- if (e.getCause() instanceof RemoteCallException) {
- RemoteCallException callException = (RemoteCallException) e.getCause();
- e.getChannel().write(callException.getResponse());
- }
-
- throw new RemoteException(e.getCause());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java
deleted file mode 100644
index f38ab9d..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.benchmark;
-
-import org.apache.tajo.rpc.Callback;
-import org.apache.tajo.rpc.NettyRpc;
-import org.apache.tajo.rpc.NettyRpcServer;
-import org.apache.tajo.rpc.RemoteException;
-
-import java.net.InetSocketAddress;
-
-import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-
-public class BenchmarkAsyncRPC {
-
- public static class ClientWrapper extends Thread {
- public void run() {
- BenchmarkClientInterface service;
- service =
- (BenchmarkClientInterface) NettyRpc.getProtoParamAsyncRpcProxy(
- BenchmarkServerInterface.class, BenchmarkClientInterface.class,
- new InetSocketAddress(15010));
-
- long start = System.currentTimeMillis();
- Callback<StringProto> cb = new Callback<StringProto>();
- StringProto ps = StringProto.newBuilder().setValue("ABCD").build();
- for (int i = 0; i < 100000; i++) {
- service.shoot(cb, ps);
- }
- long end = System.currentTimeMillis();
-
- System.out.println("elapsed time: " + (end - start) + "msc");
- }
- }
-
- public static interface BenchmarkClientInterface {
- public void shoot(Callback<StringProto> ret, StringProto l)
- throws RemoteException;
- }
-
- public static interface BenchmarkServerInterface {
- public StringProto shoot(StringProto l) throws RemoteException;
- }
-
- public static class BenchmarkImpl implements BenchmarkServerInterface {
- @Override
- public StringProto shoot(StringProto l) {
- return l;
- }
- }
-
- public static void main(String[] args) throws Exception {
- NettyRpcServer rpcServer =
- NettyRpc.getProtoParamRpcServer(new BenchmarkImpl(),
- BenchmarkServerInterface.class, new InetSocketAddress(15010));
- rpcServer.start();
- Thread.sleep(1000);
-
- int numThreads = 1;
- ClientWrapper client[] = new ClientWrapper[numThreads];
- for (int i = 0; i < numThreads; i++) {
- client[i] = new ClientWrapper();
- }
-
- for (int i = 0; i < numThreads; i++) {
- client[i].start();
- }
-
- for (int i = 0; i < numThreads; i++) {
- client[i].join();
- }
-
- rpcServer.shutdown();
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java
deleted file mode 100644
index 5ca9bd0..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.benchmark;
-
-import org.apache.tajo.rpc.NettyRpc;
-import org.apache.tajo.rpc.NettyRpcServer;
-import org.apache.tajo.rpc.RemoteException;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-
-import java.net.InetSocketAddress;
-
-public class BenchmarkBlockingRPC {
-
- public static class ClientWrapper extends Thread {
- @SuppressWarnings("unused")
- public void run() {
- InetSocketAddress addr = new InetSocketAddress("localhost", 15001);
- BenchmarkInterface proxy;
- proxy =
- (BenchmarkInterface) NettyRpc.getProtoParamBlockingRpcProxy(
- BenchmarkInterface.class, addr);
-
- long start = System.currentTimeMillis();
- StringProto ps = StringProto.newBuilder().setValue("ABCD").build();
- for (int i = 0; i < 100000; i++) {
- try {
- proxy.shoot(ps);
- } catch (RemoteException e1) {
- System.out.println(e1.getMessage());
- }
- }
- long end = System.currentTimeMillis();
- System.out.println("elapsed time: " + (end - start) + "msc");
-
- }
- }
-
- public static interface BenchmarkInterface {
- public StringProto shoot(StringProto l) throws RemoteException;
- }
-
- public static class BenchmarkImpl implements BenchmarkInterface {
- @Override
- public StringProto shoot(StringProto l) {
- return l;
- }
- }
-
- public static void main(String[] args) throws InterruptedException,
- RemoteException {
-
- NettyRpcServer server =
- NettyRpc
- .getProtoParamRpcServer(new BenchmarkImpl(),
- BenchmarkInterface.class, new InetSocketAddress("localhost",
- 15001));
-
- server.start();
- Thread.sleep(1000);
-
- int numThreads = 1;
- ClientWrapper client[] = new ClientWrapper[numThreads];
- for (int i = 0; i < numThreads; i++) {
- client[i] = new ClientWrapper();
- }
-
- for (int i = 0; i < numThreads; i++) {
- client[i].start();
- }
-
- for (int i = 0; i < numThreads; i++) {
- client[i].join();
- }
-
- server.shutdown();
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java
deleted file mode 100644
index 6487fde..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.benchmark;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.ipc.VersionedProtocol;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-public class BenchmarkHadoopRPC {
-
- public static class ClientWrapper extends Thread {
- @SuppressWarnings("unused")
- public void run() {
- InetSocketAddress addr = new InetSocketAddress("localhost", 15000);
- BenchmarkInterface proxy = null;
- try {
- proxy =
- RPC.waitForProxy(BenchmarkInterface.class, 1,
- addr, new Configuration());
- } catch (IOException e1) {
- e1.printStackTrace();
- }
-
- long start = System.currentTimeMillis();
- for (int i = 0; i < 100000; i++) {
- proxy.shoot("ABCD");
- }
- long end = System.currentTimeMillis();
- System.out.println("elapsed time: " + (end - start) + "msc");
- }
- }
-
- public static interface BenchmarkInterface extends VersionedProtocol {
- public String shoot(String l);
- }
-
- public static class BenchmarkImpl implements BenchmarkInterface {
- @Override
- public String shoot(String l) {
- return l;
- }
-
- @Override
- public long getProtocolVersion(String arg0, long arg1) throws IOException {
- return 1l;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- ProtocolSignature ps = null;
- try {
- ps = ProtocolSignature.getProtocolSignature("benchmarkInterface", 0);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
-
- return ps;
- }
- }
-
- public static void main(String[] args) throws InterruptedException,
- IOException {
- Server server =
- RPC.getServer(new BenchmarkImpl(), "localhost", 15000,
- new Configuration());
- server.start();
- Thread.sleep(1000);
-
- int numThreads = 1;
- ClientWrapper client[] = new ClientWrapper[numThreads];
- for (int i = 0; i < numThreads; i++) {
- client[i] = new ClientWrapper();
- }
-
- for (int i = 0; i < numThreads; i++) {
- client[i].start();
- }
-
- for (int i = 0; i < numThreads; i++) {
- client[i].join();
- }
-
- server.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto b/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto
deleted file mode 100644
index f8cac2a..0000000
--- a/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc";
-option java_outer_classname = "ProtoParamRpcProtocol";
-option java_generic_services = true;
-
-import "ProtoParamRpcProtos.proto";
-
-service ProtoParamRpcService {
- rpc invoke (Invocation) returns (Response);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto b/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto
deleted file mode 100644
index 5cde52f..0000000
--- a/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc";
-option java_outer_classname = "ProtoParamRpcProtos";
-
-message Invocation {
- required int32 id = 1;
- required string method_name = 2;
- repeated bytes param = 3;
-}
-
-message Response {
- required int32 id = 1;
- required bool has_return = 2;
- optional bytes return_value = 3;
- optional string exception_message = 4;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
new file mode 100644
index 0000000..9ae883b
--- /dev/null
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.apache.tajo.util.NetUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+public class TestAsyncRpc {
+ private static Log LOG = LogFactory.getLog(TestAsyncRpc.class);
+ private static String MESSAGE = "TestAsyncRpc";
+
+ double sum;
+ String echo;
+
+ static AsyncRpcServer server;
+ static AsyncRpcClient client;
+ static Interface stub;
+ static DummyProtocolAsyncImpl service;
+
+ @Before
+ public void setUp() throws Exception {
+ service = new DummyProtocolAsyncImpl();
+ server = new AsyncRpcServer(DummyProtocol.class,
+ service, new InetSocketAddress("127.0.0.1", 0));
+ server.start();
+ client = new AsyncRpcClient(DummyProtocol.class,
+ NetUtils.getConnectAddress(server.getListenAddress()));
+ stub = client.getStub();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if(client != null) {
+ client.close();
+ }
+ if(server != null) {
+ server.shutdown();
+ }
+ }
+
+ boolean calledMarker = false;
+ @Test
+ public void testRpc() throws Exception {
+
+ SumRequest sumRequest = SumRequest.newBuilder()
+ .setX1(1)
+ .setX2(2)
+ .setX3(3.15d)
+ .setX4(2.0f).build();
+
+ stub.sum(null, sumRequest, new RpcCallback<SumResponse>() {
+ @Override
+ public void run(SumResponse parameter) {
+ sum = parameter.getResult();
+ assertTrue(8.15d == sum);
+ }
+ });
+
+
+ EchoMessage echoMessage = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+ RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
+ @Override
+ public void run(EchoMessage parameter) {
+ echo = parameter.getMessage();
+ assertEquals(MESSAGE, echo);
+ calledMarker = true;
+ }
+ };
+ stub.echo(null, echoMessage, callback);
+ Thread.sleep(1000);
+ assertTrue(calledMarker);
+ }
+
+ private CountDownLatch testNullLatch;
+
+ @Test
+ public void testGetNull() throws Exception {
+ testNullLatch = new CountDownLatch(1);
+ stub.getNull(null, null, new RpcCallback<EchoMessage>() {
+ @Override
+ public void run(EchoMessage parameter) {
+ assertNull(parameter);
+ LOG.info("testGetNull retrieved");
+ testNullLatch.countDown();
+ }
+ });
+ testNullLatch.await(1000, TimeUnit.MILLISECONDS);
+ assertTrue(service.getNullCalled);
+ }
+
+ @Test
+ public void testCallFuture() throws Exception {
+ EchoMessage echoMessage = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+ CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+ stub.deley(null, echoMessage, future);
+
+ assertFalse(future.isDone());
+ assertEquals(future.get(), echoMessage);
+ assertTrue(future.isDone());
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testCallFutureTimeout() throws Exception {
+ EchoMessage echoMessage = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+ CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+ stub.deley(null, echoMessage, future);
+
+ assertFalse(future.isDone());
+ assertEquals(future.get(1, TimeUnit.SECONDS), echoMessage);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
new file mode 100644
index 0000000..04bb13b
--- /dev/null
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
+import org.apache.tajo.util.NetUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class TestBlockingRpc {
+ public static String MESSAGE = "TestBlockingRpc";
+
+ private BlockingRpcServer server;
+ private BlockingRpcClient client;
+ private BlockingInterface stub;
+ private DummyProtocolBlockingImpl service;
+
+ @Before
+ public void setUp() throws Exception {
+ service = new DummyProtocolBlockingImpl();
+ server = new BlockingRpcServer(DummyProtocol.class, service,
+ new InetSocketAddress("127.0.0.1", 0));
+ server.start();
+ client = new BlockingRpcClient(DummyProtocol.class,
+ NetUtils.getConnectAddress(server.getListenAddress()));
+ stub = client.getStub();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if(client != null) {
+ client.close();
+ }
+ if(server != null) {
+ server.shutdown();
+ }
+ }
+
+ @Test
+ public void testRpc() throws Exception {
+ SumRequest request = SumRequest.newBuilder()
+ .setX1(1)
+ .setX2(2)
+ .setX3(3.15d)
+ .setX4(2.0f).build();
+ SumResponse response1 = stub.sum(null, request);
+ assertTrue(8.15d == response1.getResult());
+
+ EchoMessage message = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+ EchoMessage response2 = stub.echo(null, message);
+ assertEquals(MESSAGE, response2.getMessage());
+ }
+
+ @Test
+ public void testGetNull() throws Exception {
+ assertNull(stub.getNull(null, null));
+ assertTrue(service.getNullCalled);
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ final StringBuilder error = new StringBuilder();
+ Thread callThread = new Thread() {
+ public void run() {
+ try {
+ EchoMessage message = EchoMessage.newBuilder()
+ .setMessage(MESSAGE)
+ .build();
+ stub.deley(null, message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ error.append(e.getMessage());
+ }
+ synchronized(error) {
+ error.notifyAll();
+ }
+ }
+ };
+
+ callThread.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread shutdownThread = new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ try {
+ server.shutdown();
+ server = null;
+ latch.countDown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ shutdownThread.start();
+
+ latch.await(5 * 1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(latch.getCount() == 0);
+
+ synchronized(error) {
+ error.wait(5 * 1000);
+ }
+
+ if(!error.toString().isEmpty()) {
+ fail(error.toString());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
deleted file mode 100644
index 0d01a49..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.apache.tajo.util.NetUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-
-public class TestProtoAsyncRpc {
- private static Log LOG = LogFactory.getLog(TestProtoAsyncRpc.class);
- private static String MESSAGE = "TestProtoAsyncRpc";
-
- double sum;
- String echo;
-
- static ProtoAsyncRpcServer server;
- static ProtoAsyncRpcClient client;
- static Interface stub;
- static DummyProtocolAsyncImpl service;
-
- @Before
- public void setUp() throws Exception {
- service = new DummyProtocolAsyncImpl();
- server = new ProtoAsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0));
- server.start();
- client = new ProtoAsyncRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(server.getListenAddress()));
- stub = client.getStub();
- }
-
- @After
- public void tearDown() throws Exception {
- if(client != null) {
- client.close();
- }
- if(server != null) {
- server.shutdown();
- }
- }
-
- boolean calledMarker = false;
- @Test
- public void testRpc() throws Exception {
-
- SumRequest sumRequest = SumRequest.newBuilder()
- .setX1(1)
- .setX2(2)
- .setX3(3.15d)
- .setX4(2.0f).build();
-
- stub.sum(null, sumRequest, new RpcCallback<SumResponse>() {
- @Override
- public void run(SumResponse parameter) {
- sum = parameter.getResult();
- assertTrue(8.15d == sum);
- }
- });
-
-
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
- @Override
- public void run(EchoMessage parameter) {
- echo = parameter.getMessage();
- assertEquals(MESSAGE, echo);
- calledMarker = true;
- }
- };
- stub.echo(null, echoMessage, callback);
- Thread.sleep(1000);
- assertTrue(calledMarker);
- }
-
- private CountDownLatch testNullLatch;
-
- @Test
- public void testGetNull() throws Exception {
- testNullLatch = new CountDownLatch(1);
- stub.getNull(null, null, new RpcCallback<EchoMessage>() {
- @Override
- public void run(EchoMessage parameter) {
- assertNull(parameter);
- LOG.info("testGetNull retrieved");
- testNullLatch.countDown();
- }
- });
- testNullLatch.await(1000, TimeUnit.MILLISECONDS);
- assertTrue(service.getNullCalled);
- }
-
- @Test
- public void testCallFuture() throws Exception {
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture2<EchoMessage> future = new CallFuture2<EchoMessage>();
- stub.deley(null, echoMessage, future);
-
- assertFalse(future.isDone());
- assertEquals(future.get(), echoMessage);
- assertTrue(future.isDone());
- }
-
- @Test(expected = TimeoutException.class)
- public void testCallFutureTimeout() throws Exception {
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture2<EchoMessage> future = new CallFuture2<EchoMessage>();
- stub.deley(null, echoMessage, future);
-
- assertFalse(future.isDone());
- assertEquals(future.get(1, TimeUnit.SECONDS), echoMessage);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
deleted file mode 100644
index 732f7f1..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.apache.tajo.util.NetUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-public class TestProtoBlockingRpc {
- public static String MESSAGE = "TestProtoBlockingRpc";
-
- private ProtoBlockingRpcServer server;
- private ProtoBlockingRpcClient client;
- private BlockingInterface stub;
- private DummyProtocolBlockingImpl service;
-
- @Before
- public void setUp() throws Exception {
- service = new DummyProtocolBlockingImpl();
- server = new ProtoBlockingRpcServer(DummyProtocol.class, service,
- new InetSocketAddress("127.0.0.1", 0));
- server.start();
- client = new ProtoBlockingRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(server.getListenAddress()));
- stub = client.getStub();
- }
-
- @After
- public void tearDown() throws Exception {
- if(client != null) {
- client.close();
- }
- if(server != null) {
- server.shutdown();
- }
- }
-
- @Test
- public void testRpc() throws Exception {
- SumRequest request = SumRequest.newBuilder()
- .setX1(1)
- .setX2(2)
- .setX3(3.15d)
- .setX4(2.0f).build();
- SumResponse response1 = stub.sum(null, request);
- assertTrue(8.15d == response1.getResult());
-
- EchoMessage message = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- EchoMessage response2 = stub.echo(null, message);
- assertEquals(MESSAGE, response2.getMessage());
- }
-
- @Test
- public void testGetNull() throws Exception {
- assertNull(stub.getNull(null, null));
- assertTrue(service.getNullCalled);
- }
-
- @Test
- public void testShutdown() throws Exception {
- final StringBuilder error = new StringBuilder();
- Thread callThread = new Thread() {
- public void run() {
- try {
- EchoMessage message = EchoMessage.newBuilder()
- .setMessage(MESSAGE)
- .build();
- stub.deley(null, message);
- } catch (Exception e) {
- e.printStackTrace();
- error.append(e.getMessage());
- }
- synchronized(error) {
- error.notifyAll();
- }
- }
- };
-
- callThread.start();
-
- final CountDownLatch latch = new CountDownLatch(1);
- Thread shutdownThread = new Thread() {
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- try {
- server.shutdown();
- server = null;
- latch.countDown();
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- };
- shutdownThread.start();
-
- latch.await(5 * 1000, TimeUnit.MILLISECONDS);
-
- assertTrue(latch.getCount() == 0);
-
- synchronized(error) {
- error.wait(5 * 1000);
- }
-
- if(!error.toString().isEmpty()) {
- fail(error.toString());
- }
- }
-}
\ No newline at end of file
[2/2] git commit: TAJO-257: Unit tests occassionally fail. (hyunsik)
Posted by hy...@apache.org.
TAJO-257: Unit tests occassionally fail. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fd60ec39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fd60ec39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fd60ec39
Branch: refs/heads/master
Commit: fd60ec395b951764caaed8a8ebe5bf5bef1eb96f
Parents: 202aa29
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Oct 16 20:59:17 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Oct 16 20:59:17 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tajo/catalog/CatalogClient.java | 6 +-
.../org/apache/tajo/catalog/CatalogServer.java | 6 +-
.../java/org/apache/tajo/client/TajoClient.java | 14 +-
.../tajo/engine/json/FragmentDeserializer.java | 54 -----
.../apache/tajo/master/TajoContainerProxy.java | 10 +-
.../tajo/master/TajoMasterClientService.java | 6 +-
.../apache/tajo/master/TajoMasterService.java | 6 +-
.../master/querymaster/QueryInProgress.java | 6 +-
.../tajo/master/querymaster/QueryMaster.java | 9 +-
.../master/querymaster/QueryMasterTask.java | 8 +-
.../tajo/worker/TajoResourceAllocator.java | 6 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 12 +-
.../tajo/worker/TajoWorkerClientService.java | 6 +-
.../tajo/worker/TajoWorkerManagerService.java | 6 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 17 +-
tajo-rpc/pom.xml | 2 -
.../org/apache/tajo/rpc/AsyncRpcClient.java | 206 ++++++++++++++++
.../org/apache/tajo/rpc/AsyncRpcServer.java | 124 ++++++++++
.../org/apache/tajo/rpc/BlockingRpcClient.java | 236 +++++++++++++++++++
.../org/apache/tajo/rpc/BlockingRpcServer.java | 120 ++++++++++
.../java/org/apache/tajo/rpc/CallFuture.java | 65 +++--
.../java/org/apache/tajo/rpc/CallFuture2.java | 74 ------
.../main/java/org/apache/tajo/rpc/Callback.java | 112 ---------
.../org/apache/tajo/rpc/NettyAsyncRpcProxy.java | 177 --------------
.../apache/tajo/rpc/NettyBlockingRpcProxy.java | 157 ------------
.../main/java/org/apache/tajo/rpc/NettyRpc.java | 42 ----
.../org/apache/tajo/rpc/NettyRpcServer.java | 161 -------------
.../apache/tajo/rpc/ProtoAsyncRpcClient.java | 206 ----------------
.../apache/tajo/rpc/ProtoAsyncRpcServer.java | 124 ----------
.../apache/tajo/rpc/ProtoBlockingRpcClient.java | 236 -------------------
.../apache/tajo/rpc/ProtoBlockingRpcServer.java | 120 ----------
.../tajo/rpc/benchmark/BenchmarkAsyncRPC.java | 92 --------
.../rpc/benchmark/BenchmarkBlockingRPC.java | 94 --------
.../tajo/rpc/benchmark/BenchmarkHadoopRPC.java | 107 ---------
.../src/main/proto/ProtoParamRpcProtocol.proto | 25 --
.../src/main/proto/ProtoParamRpcProtos.proto | 31 ---
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 148 ++++++++++++
.../org/apache/tajo/rpc/TestBlockingRpc.java | 141 +++++++++++
.../org/apache/tajo/rpc/TestProtoAsyncRpc.java | 148 ------------
.../apache/tajo/rpc/TestProtoBlockingRpc.java | 141 -----------
41 files changed, 1068 insertions(+), 2195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 074416f..433d782 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -201,6 +201,8 @@ Release 0.2.0 - unreleased
BUG FIXES
+ TAJO-257: Unit tests occassionally fail. (hyunsik)
+
TAJO-169: the default TAJO_WORKER_STANDBY_MODE in tajo-env.sh is wrong.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 5190998..6240631 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.rpc.ProtoBlockingRpcClient;
+import org.apache.tajo.rpc.BlockingRpcClient;
import org.apache.tajo.util.NetUtils;
import java.io.IOException;
@@ -34,7 +34,7 @@ import java.net.InetSocketAddress;
*/
public class CatalogClient extends AbstractCatalogClient {
private final Log LOG = LogFactory.getLog(CatalogClient.class);
- private ProtoBlockingRpcClient client;
+ private BlockingRpcClient client;
/**
* @throws java.io.IOException
@@ -53,7 +53,7 @@ public class CatalogClient extends AbstractCatalogClient {
String addrStr = NetUtils.normalizeInetSocketAddress(serverAddr);
LOG.info("Trying to connect the catalog (" + addrStr + ")");
try {
- client = new ProtoBlockingRpcClient(CatalogProtocol.class, serverAddr);
+ client = new BlockingRpcClient(CatalogProtocol.class, serverAddr);
setStub((BlockingInterface) client.getStub());
} catch (Exception e) {
throw new IOException("Can't connect the catalog server (" + addrStr +")");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index f15d962..62096b1 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -33,7 +33,7 @@ import org.apache.tajo.catalog.store.DerbyStore;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
@@ -70,7 +70,7 @@ public class CatalogServer extends AbstractService {
List<FunctionDescProto>>();
// RPC variables
- private ProtoBlockingRpcServer rpcServer;
+ private BlockingRpcServer rpcServer;
private InetSocketAddress bindAddress;
private String bindAddressStr;
final CatalogProtocolHandler handler;
@@ -137,7 +137,7 @@ public class CatalogServer extends AbstractService {
String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
try {
- this.rpcServer = new ProtoBlockingRpcServer(
+ this.rpcServer = new BlockingRpcServer(
CatalogProtocol.class,
handler, initIsa);
this.rpcServer.start();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 324ced4..19fa618 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -36,7 +36,7 @@ import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
-import org.apache.tajo.rpc.ProtoBlockingRpcClient;
+import org.apache.tajo.rpc.BlockingRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.util.NetUtils;
@@ -52,14 +52,14 @@ public class TajoClient {
private final Log LOG = LogFactory.getLog(TajoClient.class);
private final TajoConf conf;
- private ProtoBlockingRpcClient tasjoMasterClient;
+ private BlockingRpcClient tasjoMasterClient;
private TajoMasterClientProtocolService.BlockingInterface tajoMasterService;
private Map<QueryId, QueryMasterClientProtocolService.BlockingInterface> queryMasterConnectionMap =
new HashMap<QueryId, QueryMasterClientProtocolService.BlockingInterface>();
- private Map<QueryId, ProtoBlockingRpcClient> queryMasterClientMap =
- new HashMap<QueryId, ProtoBlockingRpcClient>();
+ private Map<QueryId, BlockingRpcClient> queryMasterClientMap =
+ new HashMap<QueryId, BlockingRpcClient>();
public TajoClient(TajoConf conf) throws IOException {
this.conf = conf;
@@ -81,7 +81,7 @@ public class TajoClient {
private void connect(InetSocketAddress addr) throws IOException {
try {
- tasjoMasterClient = new ProtoBlockingRpcClient(TajoMasterClientProtocol.class, addr);
+ tasjoMasterClient = new BlockingRpcClient(TajoMasterClientProtocol.class, addr);
tajoMasterService = tasjoMasterClient.getStub();
} catch (Exception e) {
throw new IOException(e);
@@ -91,7 +91,7 @@ public class TajoClient {
public void close() {
tasjoMasterClient.close();
- for(ProtoBlockingRpcClient eachClient: queryMasterClientMap.values()) {
+ for(BlockingRpcClient eachClient: queryMasterClientMap.values()) {
eachClient.close();
}
queryMasterClientMap.clear();
@@ -177,7 +177,7 @@ public class TajoClient {
private void connectionToQueryMaster(QueryId queryId, String queryMasterHost, int queryMasterPort) {
try {
InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterHost, queryMasterPort);
- ProtoBlockingRpcClient client = new ProtoBlockingRpcClient(QueryMasterClientProtocol.class, addr);
+ BlockingRpcClient client = new BlockingRpcClient(QueryMasterClientProtocol.class, addr);
QueryMasterClientProtocolService.BlockingInterface service = client.getStub();
queryMasterConnectionMap.put(queryId, service);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
deleted file mode 100644
index 63dd1fa..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.engine.json;
-
-import com.google.gson.*;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMetaImpl;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.storage.Fragment;
-
-import java.lang.reflect.Type;
-
-public class FragmentDeserializer implements JsonDeserializer<Fragment> {
-
- @Override
- public Fragment deserialize(JsonElement json, Type type,
- JsonDeserializationContext ctx) throws JsonParseException {
- Gson gson = CoreGsonHelper.getInstance();
- JsonObject fragObj = json.getAsJsonObject();
- JsonObject metaObj = fragObj.get("meta").getAsJsonObject();
- TableMetaImpl meta = new TableMetaImpl(
- gson.fromJson(metaObj.get("schema"), Schema.class),
- gson.fromJson(metaObj.get("storeType"), StoreType.class),
- gson.fromJson(metaObj.get("options"), Options.class));
- Fragment fragment = new Fragment(fragObj.get("tabletId").getAsString(),
- gson.fromJson(fragObj.get("path"), Path.class),
- meta,
- fragObj.get("startOffset").getAsLong(),
- fragObj.get("length").getAsLong());
- return fragment;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 5f17c71..846343d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -30,8 +30,8 @@ import org.apache.tajo.master.event.QueryEventType;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.rm.TajoWorkerContainer;
import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -62,13 +62,13 @@ public class TajoContainerProxy extends ContainerProxy {
}
private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
- ProtoAsyncRpcClient tajoWorkerRpc = null;
+ AsyncRpcClient tajoWorkerRpc = null;
try {
InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
.getTajoWorkerManagerService().getBindAddr();
InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
- tajoWorkerRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+ tajoWorkerRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
TajoWorkerProtocol.RunExecutionBlockRequestProto request =
@@ -93,8 +93,8 @@ public class TajoContainerProxy extends ContainerProxy {
}
class AyncRpcClose extends Thread {
- ProtoAsyncRpcClient client;
- public AyncRpcClose(ProtoAsyncRpcClient client) {
+ AsyncRpcClient client;
+ public AyncRpcClose(AsyncRpcClient client) {
this.client = client;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index b6efd1d..38ac2a4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -45,7 +45,7 @@ import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.RemoteException;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
@@ -61,7 +61,7 @@ public class TajoMasterClientService extends AbstractService {
private final TajoConf conf;
private final CatalogService catalog;
private final TajoMasterClientProtocolServiceHandler clientHandler;
- private ProtoBlockingRpcServer server;
+ private BlockingRpcServer server;
private InetSocketAddress bindAddress;
private final BoolProto BOOL_TRUE =
@@ -84,7 +84,7 @@ public class TajoMasterClientService extends AbstractService {
String confClientServiceAddr = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
try {
- server = new ProtoBlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
+ server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
} catch (Exception e) {
LOG.error(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index d518dce..4b27e46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -29,7 +29,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.util.NetUtils;
@@ -43,7 +43,7 @@ public class TajoMasterService extends AbstractService {
private final TajoMaster.MasterContext context;
private final TajoConf conf;
private final TajoMasterServiceHandler masterHandler;
- private ProtoAsyncRpcServer server;
+ private AsyncRpcServer server;
private InetSocketAddress bindAddress;
private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
@@ -61,7 +61,7 @@ public class TajoMasterService extends AbstractService {
String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
try {
- server = new ProtoAsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
+ server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
} catch (Exception e) {
LOG.error(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2c76ac8..0098a7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -34,8 +34,8 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import java.net.InetSocketAddress;
@@ -60,7 +60,7 @@ public class QueryInProgress extends CompositeService {
private final TajoMaster.MasterContext masterContext;
- private ProtoAsyncRpcClient queryMasterRpc;
+ private AsyncRpcClient queryMasterRpc;
private TajoWorkerProtocol.TajoWorkerProtocolService queryMasterRpcClient;
@@ -180,7 +180,7 @@ public class QueryInProgress extends CompositeService {
queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
//TODO Get Connection from pool
- queryMasterRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+ queryMasterRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
queryMasterRpcClient = queryMasterRpc.getStub();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 4690567..fa6790d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -34,13 +34,16 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.QueryStartEvent;
-import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.worker.TajoWorker;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -261,7 +264,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
if(queryMasterTask != null) {
TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
- CallFuture2 future = new CallFuture2();
+ CallFuture future = new CallFuture();
workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, future);
try {
future.get(3, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index c386bf2..ae1508c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -41,13 +41,12 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.master.GlobalEngine;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.GlobalEngine;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.rpc.CallFuture2;
-import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
@@ -57,7 +56,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -161,7 +159,7 @@ public class QueryMasterTask extends CompositeService {
LOG.info("Stopping QueryMasterTask:" + queryId);
- CallFuture2 future = new CallFuture2();
+ CallFuture future = new CallFuture();
queryMasterContext.getWorkerContext().getTajoMasterRpcClient()
.stopQueryMaster(null, queryId.getProto(), future);
try {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index fa2ff14..385add1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -46,7 +46,7 @@ import org.apache.tajo.master.querymaster.SubQueryState;
import org.apache.tajo.master.rm.TajoWorkerContainer;
import org.apache.tajo.master.rm.TajoWorkerContainerId;
import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.util.ApplicationIdUtils;
import java.io.IOException;
@@ -221,8 +221,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
- CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
- new CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+ CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
+ new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
int requiredMemoryMBSlot = 512; //TODO
int requiredDiskSlots = 1; //TODO
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 37b754a..99fce45 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -36,8 +36,8 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryMaster;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.CallFuture2;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.NetUtils;
@@ -73,7 +73,7 @@ public class TajoWorker extends CompositeService {
private InetSocketAddress tajoMasterAddress;
//to TajoMaster
- private ProtoAsyncRpcClient tajoMasterRpc;
+ private AsyncRpcClient tajoMasterRpc;
private TajoMasterProtocol.TajoMasterProtocolService tajoMasterRpcClient;
@@ -306,7 +306,7 @@ public class TajoWorker extends CompositeService {
while(true) {
try {
- tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, this.tajoMasterAddress);
+ tajoMasterRpc = new AsyncRpcClient(TajoMasterProtocol.class, this.tajoMasterAddress);
tajoMasterRpcClient = tajoMasterRpc.getStub();
break;
} catch (Exception e) {
@@ -378,8 +378,8 @@ public class TajoWorker extends CompositeService {
}
public void run() {
- CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
- new CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse>();
+ CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+ new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
LOG.info("Worker Resource Heartbeat Thread start.");
int sendDiskInfoCount = 0;
int pullServerPort = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index dc27f1a..177e920 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -33,7 +33,7 @@ import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.master.querymaster.Query;
import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
@@ -46,7 +46,7 @@ public class TajoWorkerClientService extends AbstractService {
private final PrimitiveProtos.BoolProto BOOL_FALSE =
PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
- private ProtoBlockingRpcServer rpcServer;
+ private BlockingRpcServer rpcServer;
private InetSocketAddress bindAddr;
private String addr;
private int port;
@@ -76,7 +76,7 @@ public class TajoWorkerClientService extends AbstractService {
}
// TODO blocking/non-blocking??
- this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
+ this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 76011dc..4011829 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -36,7 +36,7 @@ import org.apache.tajo.master.TaskSchedulerImpl;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.querymaster.QueryMaster;
import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
@@ -46,7 +46,7 @@ public class TajoWorkerManagerService extends CompositeService
implements TajoWorkerProtocol.TajoWorkerProtocolService.Interface {
private static final Log LOG = LogFactory.getLog(TajoWorkerManagerService.class.getName());
- private ProtoAsyncRpcServer rpcServer;
+ private AsyncRpcServer rpcServer;
private InetSocketAddress bindAddr;
private String addr;
private int port;
@@ -75,7 +75,7 @@ public class TajoWorkerManagerService extends CompositeService
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
- this.rpcServer = new ProtoAsyncRpcServer(TajoWorkerProtocol.class, this, initIsa);
+ this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa);
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 9fa158a..b26c3f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -39,13 +39,12 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
import org.apache.tajo.util.TajoIdUtils;
import java.net.InetSocketAddress;
-import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.*;
@@ -101,7 +100,7 @@ public class TaskRunner extends AbstractService {
private String baseDir;
private Path baseDirPath;
- private ProtoAsyncRpcClient client;
+ private AsyncRpcClient client;
private TaskRunnerManager taskRunnerManager;
@@ -137,10 +136,10 @@ public class TaskRunner extends AbstractService {
// initialize MasterWorkerProtocol as an actual task owner.
this.client =
- taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
+ taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
@Override
- public ProtoAsyncRpcClient run() throws Exception {
- return new ProtoAsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+ public AsyncRpcClient run() throws Exception {
+ return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
}
});
this.master = client.getStub();
@@ -297,13 +296,13 @@ public class TaskRunner extends AbstractService {
@Override
public void run() {
int receivedNum = 0;
- CallFuture2<QueryUnitRequestProto> callFuture = null;
+ CallFuture<QueryUnitRequestProto> callFuture = null;
QueryUnitRequestProto taskRequest = null;
while(!stopped) {
try {
if (callFuture == null) {
- callFuture = new CallFuture2<QueryUnitRequestProto>();
+ callFuture = new CallFuture<QueryUnitRequestProto>();
LOG.info("Request GetTask: " + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 4957bc4..2e5d2f1 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -83,8 +83,6 @@
<arguments>
<argument>-Isrc/main/proto/</argument>
<argument>--java_out=target/generated-sources/proto</argument>
- <argument>src/main/proto/ProtoParamRpcProtos.proto</argument>
- <argument>src/main/proto/ProtoParamRpcProtocol.proto</argument>
<argument>src/main/proto/DummyProtos.proto</argument>
<argument>src/main/proto/RpcProtos.proto</argument>
<argument>src/main/proto/TestProtos.proto</argument>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
new file mode 100644
index 0000000..b062eca
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.*;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsyncRpcClient extends NettyClientBase {
+ private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+
+ private final ChannelUpstreamHandler handler;
+ private final ChannelPipelineFactory pipeFactory;
+ private final ProxyRpcChannel rpcChannel;
+
+ private final AtomicInteger sequence = new AtomicInteger(0);
+ private final Map<Integer, ResponseCallback> requests =
+ new ConcurrentHashMap<Integer, ResponseCallback>();
+
+ private final Class<?> protocol;
+ private final Method stubMethod;
+
+ public AsyncRpcClient(final Class<?> protocol,
+ final InetSocketAddress addr)
+ throws Exception {
+
+ this.protocol = protocol;
+ String serviceClassName = protocol.getName() + "$"
+ + protocol.getSimpleName() + "Service";
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
+
+ this.handler = new ClientChannelUpstreamHandler();
+ pipeFactory = new ProtoPipelineFactory(handler,
+ RpcResponse.getDefaultInstance());
+ super.init(addr, pipeFactory);
+ rpcChannel = new ProxyRpcChannel(getChannel());
+ }
+
+ public <T> T getStub() throws Exception {
+ return (T) stubMethod.invoke(null, rpcChannel);
+ }
+
+ public RpcChannel getRpcChannel() {
+ return this.rpcChannel;
+ }
+
+ private class ProxyRpcChannel implements RpcChannel {
+ private final Channel channel;
+ private final ClientChannelUpstreamHandler handler;
+
+ public ProxyRpcChannel(Channel channel) {
+ this.channel = channel;
+ this.handler = channel.getPipeline()
+ .get(ClientChannelUpstreamHandler.class);
+
+ if (handler == null) {
+ throw new IllegalArgumentException("Channel does not have " +
+ "proper handler");
+ }
+ }
+
+ public void callMethod(final MethodDescriptor method,
+ final RpcController controller,
+ final Message param,
+ final Message responseType,
+ RpcCallback<Message> done) {
+
+ int nextSeqId = sequence.getAndIncrement();
+
+ Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+ handler.registerCallback(nextSeqId,
+ new ResponseCallback(controller, responseType, done));
+ channel.write(rpcRequest);
+ }
+
+ private Message buildRequest(int seqId,
+ MethodDescriptor method,
+ Message param) {
+
+ RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+ .setId(seqId)
+ .setMethodName(method.getName());
+
+ if (param != null) {
+ requestBuilder.setRequestMessage(param.toByteString());
+ }
+
+ return requestBuilder.build();
+ }
+ }
+
+ private class ResponseCallback implements RpcCallback<RpcResponse> {
+ private final RpcController controller;
+ private final Message responsePrototype;
+ private final RpcCallback<Message> callback;
+
+ public ResponseCallback(RpcController controller,
+ Message responsePrototype,
+ RpcCallback<Message> callback) {
+ this.controller = controller;
+ this.responsePrototype = responsePrototype;
+ this.callback = callback;
+ }
+
+ public void run(RpcResponse rpcResponse) {
+
+ // if hasErrorMessage is true, it means rpc-level errors.
+ // it does not call the callback function
+ if (rpcResponse.hasErrorMessage()) {
+
+ if (controller != null) {
+ this.controller.setFailed(rpcResponse.getErrorMessage());
+ }
+ callback.run(null);
+ throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
+
+ } else { // if rpc call succeed
+
+ try {
+ Message responseMessage;
+ if (!rpcResponse.hasResponseMessage()) {
+ responseMessage = null;
+ } else {
+ responseMessage = responsePrototype.newBuilderForType().mergeFrom(
+ rpcResponse.getResponseMessage()).build();
+ }
+
+ callback.run(responseMessage);
+
+ } catch (InvalidProtocolBufferException e) {
+ throw new RemoteException(getErrorMessage(""), e);
+ }
+ }
+ }
+ }
+
+ private String getErrorMessage(String message) {
+ return "Exception [" + protocol.getCanonicalName() +
+ "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+ getChannel().getRemoteAddress()) + ")]: " + message;
+ }
+
+ private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+
+ synchronized void registerCallback(int seqId, ResponseCallback callback) {
+
+ if (requests.containsKey(seqId)) {
+ throw new RemoteException(
+ getErrorMessage("Duplicate Sequence Id "+ seqId));
+ }
+
+ requests.put(seqId, callback);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+
+ RpcResponse response = (RpcResponse) e.getMessage();
+ ResponseCallback callback = requests.remove(response.getId());
+
+ if (callback == null) {
+ LOG.warn("Dangling rpc call");
+ } else {
+ callback.run(response);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ LOG.error(addr + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
+ e.getChannel().close();
+ throw new RemoteException(getErrorMessage(addr.toString()), e.getCause());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
new file mode 100644
index 0000000..5cf830e
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+import org.jboss.netty.channel.*;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class AsyncRpcServer extends NettyServerBase {
+ private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
+
+ private final Service service;
+ private final ChannelPipelineFactory pipeline;
+
+ public AsyncRpcServer(final Class<?> protocol,
+ final Object instance,
+ final InetSocketAddress bindAddress)
+ throws Exception {
+ super(protocol.getSimpleName(), bindAddress);
+
+ String serviceClassName = protocol.getName() + "$" +
+ protocol.getSimpleName() + "Service";
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
+ Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
+ this.service = (Service) method.invoke(null, instance);
+
+ ServerHandler handler = new ServerHandler();
+ this.pipeline = new ProtoPipelineFactory(handler,
+ RpcRequest.getDefaultInstance());
+ super.init(this.pipeline);
+ }
+
+ private class ServerHandler extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+
+ final RpcRequest request = (RpcRequest) e.getMessage();
+
+ String methodName = request.getMethodName();
+ MethodDescriptor methodDescriptor = service.getDescriptorForType().
+ findMethodByName(methodName);
+
+ if (methodDescriptor == null) {
+ throw new RemoteCallException(request.getId(),
+ new NoSuchMethodException(methodName));
+ }
+
+ Message paramProto = null;
+ if (request.hasRequestMessage()) {
+ try {
+ paramProto = service.getRequestPrototype(methodDescriptor)
+ .newBuilderForType().mergeFrom(request.getRequestMessage()).
+ build();
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+ }
+
+ final Channel channel = e.getChannel();
+ final RpcController controller = new NettyRpcController();
+
+ RpcCallback<Message> callback =
+ !request.hasId() ? null : new RpcCallback<Message>() {
+
+ public void run(Message returnValue) {
+
+ RpcResponse.Builder builder = RpcResponse.newBuilder()
+ .setId(request.getId());
+
+ if (returnValue != null) {
+ builder.setResponseMessage(returnValue.toByteString());
+ }
+
+ if (controller.failed()) {
+ builder.setErrorMessage(controller.errorText());
+ }
+
+ channel.write(builder.build());
+ }
+ };
+
+ service.callMethod(methodDescriptor, controller, paramProto, callback);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception{
+ if (e.getCause() instanceof RemoteCallException) {
+ RemoteCallException callException = (RemoteCallException) e.getCause();
+ e.getChannel().write(callException.getResponse());
+ }
+ throw new RemoteException(e.getCause());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
new file mode 100644
index 0000000..4989c2d
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.*;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class BlockingRpcClient extends NettyClientBase {
+ private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+
+ private final ChannelUpstreamHandler handler;
+ private final ChannelPipelineFactory pipeFactory;
+ private final ProxyRpcChannel rpcChannel;
+
+ private final AtomicInteger sequence = new AtomicInteger(0);
+ private final Map<Integer, ProtoCallFuture> requests =
+ new ConcurrentHashMap<Integer, ProtoCallFuture>();
+
+ private final Class<?> protocol;
+ private final Method stubMethod;
+
+ public BlockingRpcClient(final Class<?> protocol,
+ final InetSocketAddress addr)
+ throws Exception {
+
+ this.protocol = protocol;
+ String serviceClassName = protocol.getName() + "$"
+ + protocol.getSimpleName() + "Service";
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ stubMethod = serviceClass.getMethod("newBlockingStub",
+ BlockingRpcChannel.class);
+
+ this.handler = new ClientChannelUpstreamHandler();
+ pipeFactory = new ProtoPipelineFactory(handler,
+ RpcResponse.getDefaultInstance());
+ super.init(addr, pipeFactory);
+ rpcChannel = new ProxyRpcChannel(getChannel());
+ }
+
+ public <T> T getStub() throws Exception {
+ return (T) stubMethod.invoke(null, rpcChannel);
+ }
+
+ public BlockingRpcChannel getBlockingRpcChannel() {
+ return this.rpcChannel;
+ }
+
+ private class ProxyRpcChannel implements BlockingRpcChannel {
+ private final Channel channel;
+ private final ClientChannelUpstreamHandler handler;
+
+ public ProxyRpcChannel(Channel channel) {
+ this.channel = channel;
+ this.handler = channel.getPipeline().
+ get(ClientChannelUpstreamHandler.class);
+
+ if (handler == null) {
+ throw new IllegalArgumentException("Channel does not have " +
+ "proper handler");
+ }
+ }
+
+ public Message callBlockingMethod(final MethodDescriptor method,
+ final RpcController controller,
+ final Message param,
+ final Message responsePrototype)
+ throws ServiceException {
+
+ int nextSeqId = sequence.getAndIncrement();
+
+ Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+ ProtoCallFuture callFuture =
+ new ProtoCallFuture(controller, responsePrototype);
+ requests.put(nextSeqId, callFuture);
+ channel.write(rpcRequest);
+
+ try {
+ return callFuture.get();
+ } catch (Throwable t) {
+ throw new RemoteException(t);
+ }
+ }
+
+ private Message buildRequest(int seqId,
+ MethodDescriptor method,
+ Message param) {
+ RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+ .setId(seqId)
+ .setMethodName(method.getName());
+
+ if (param != null) {
+ requestBuilder.setRequestMessage(param.toByteString());
+ }
+
+ return requestBuilder.build();
+ }
+ }
+
+ private String getErrorMessage(String message) {
+ if(protocol != null && getChannel() != null) {
+ return "Exception [" + protocol.getCanonicalName() +
+ "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+ getChannel().getRemoteAddress()) + ")]: " + message;
+ } else {
+ return "Exception " + message;
+ }
+ }
+
+ private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+
+ RpcResponse rpcResponse = (RpcResponse) e.getMessage();
+ ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+
+ if (callback == null) {
+ LOG.warn("Dangling rpc call");
+ } else {
+ if (rpcResponse.hasErrorMessage()) {
+ if (callback.controller != null) {
+ callback.setFailed(rpcResponse.getErrorMessage());
+ }
+ throw new RemoteException(
+ getErrorMessage(rpcResponse.getErrorMessage()));
+ } else {
+ Message responseMessage;
+
+ if (!rpcResponse.hasResponseMessage()) {
+ responseMessage = null;
+ } else {
+ responseMessage =
+ callback.returnType.newBuilderForType().
+ mergeFrom(rpcResponse.getResponseMessage()).build();
+ }
+
+ callback.setResponse(responseMessage);
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ e.getChannel().close();
+ throw new RemoteException(getErrorMessage(""), e.getCause());
+ }
+ }
+
+ class ProtoCallFuture implements Future<Message> {
+ private Semaphore sem = new Semaphore(0);
+ private Message response = null;
+ private Message returnType;
+
+ private RpcController controller;
+
+ public ProtoCallFuture(RpcController controller, Message message) {
+ this.controller = controller;
+ this.returnType = message;
+ }
+
+ @Override
+ public boolean cancel(boolean arg0) {
+ return false;
+ }
+
+ @Override
+ public Message get() throws InterruptedException, ExecutionException {
+ sem.acquire();
+ return response;
+ }
+
+ @Override
+ public Message get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if(sem.tryAcquire(timeout, unit)) {
+ return response;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return sem.availablePermits() > 0;
+ }
+
+ public void setResponse(Message response) {
+ this.response = response;
+ sem.release();
+ }
+
+ public void setFailed(String errorText) {
+ this.controller.setFailed(errorText);
+ sem.release();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
new file mode 100644
index 0000000..3649c5e
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.*;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class BlockingRpcServer extends NettyServerBase {
+ private static Log LOG = LogFactory.getLog(BlockingRpcServer.class);
+ private final BlockingService service;
+ private final ChannelPipelineFactory pipeline;
+
+ public BlockingRpcServer(final Class<?> protocol,
+ final Object instance,
+ final InetSocketAddress bindAddress)
+ throws Exception {
+
+ super(protocol.getSimpleName(), bindAddress);
+
+ String serviceClassName = protocol.getName() + "$" +
+ protocol.getSimpleName() + "Service";
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ Class<?> interfaceClass = Class.forName(serviceClassName +
+ "$BlockingInterface");
+ Method method = serviceClass.getMethod(
+ "newReflectiveBlockingService", interfaceClass);
+
+ this.service = (BlockingService) method.invoke(null, instance);
+ this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
+ RpcRequest.getDefaultInstance());
+
+ super.init(this.pipeline);
+ }
+
+ private class ServerHandler extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ final RpcRequest request = (RpcRequest) e.getMessage();
+
+ String methodName = request.getMethodName();
+ MethodDescriptor methodDescriptor =
+ service.getDescriptorForType().findMethodByName(methodName);
+
+ if (methodDescriptor == null) {
+ throw new RemoteCallException(request.getId(),
+ new NoSuchMethodException(methodName));
+ }
+ Message paramProto = null;
+ if (request.hasRequestMessage()) {
+ try {
+ paramProto = service.getRequestPrototype(methodDescriptor)
+ .newBuilderForType().mergeFrom(request.getRequestMessage()).
+ build();
+
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+ }
+ Message returnValue;
+ RpcController controller = new NettyRpcController();
+
+ try {
+ returnValue = service.callBlockingMethod(methodDescriptor,
+ controller, paramProto);
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+
+ RpcResponse.Builder builder =
+ RpcResponse.newBuilder().setId(request.getId());
+
+ if (returnValue != null) {
+ builder.setResponseMessage(returnValue.toByteString());
+ }
+
+ if (controller.failed()) {
+ builder.setErrorMessage(controller.errorText());
+ }
+ e.getChannel().write(builder.build());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ if (e.getCause() instanceof RemoteCallException) {
+ RemoteCallException callException = (RemoteCallException) e.getCause();
+ e.getChannel().write(callException.getResponse());
+ }
+
+ throw new RemoteException(e.getCause());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
index 4a000ad..0a9adab 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
@@ -18,58 +18,57 @@
package org.apache.tajo.rpc;
-import java.util.concurrent.*;
+import com.google.protobuf.RpcCallback;
-@Deprecated
-class CallFuture implements Future<Object> {
- private Semaphore sem = new Semaphore(0);
- private Object response = null;
- @SuppressWarnings("rawtypes")
- private Class returnType;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
- @SuppressWarnings("rawtypes")
- public CallFuture(Class returnType) {
- this.returnType = returnType;
+public class CallFuture<T> implements RpcCallback<T>, Future<T> {
+
+ private final Semaphore sem = new Semaphore(0);
+ private boolean done = false;
+ private T response;
+
+ @Override
+ public void run(T t) {
+ this.response = t;
+ done = true;
+ sem.release();
}
- @SuppressWarnings("rawtypes")
- public Class getReturnType() {
- return this.returnType;
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // TODO - to be implemented
+ throw new UnsupportedOperationException();
}
@Override
- public boolean cancel(boolean arg0) {
- return false;
+ public boolean isCancelled() {
+ // TODO - to be implemented
+ throw new UnsupportedOperationException();
}
@Override
- public Object get() throws InterruptedException, ExecutionException {
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public T get() throws InterruptedException {
sem.acquire();
+
return response;
}
@Override
- public Object get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException, TimeoutException {
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
if (sem.tryAcquire(timeout, unit)) {
return response;
} else {
throw new TimeoutException();
}
}
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return sem.availablePermits() > 0;
- }
-
- public void setResponse(Object response) {
- this.response = response;
- sem.release();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java
deleted file mode 100644
index ddff447..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public class CallFuture2<T> implements RpcCallback<T>, Future<T> {
-
- private final Semaphore sem = new Semaphore(0);
- private boolean done = false;
- private T response;
-
- @Override
- public void run(T t) {
- this.response = t;
- done = true;
- sem.release();
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- // TODO - to be implemented
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isCancelled() {
- // TODO - to be implemented
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isDone() {
- return done;
- }
-
- @Override
- public T get() throws InterruptedException {
- sem.acquire();
-
- return response;
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException {
- if (sem.tryAcquire(timeout, unit)) {
- return response;
- } else {
- throw new TimeoutException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java
deleted file mode 100644
index 1a5c54d..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.util.concurrent.*;
-
-@Deprecated
-public class Callback<T> implements Future<T> {
- public enum Status {
- READY, SUCCESS, FAILURE
- }
-
- private Status status;
- private Semaphore sem = new Semaphore(0);
- private T result = null;
- private RemoteException err;
-
- public Callback() {
- status = Status.READY;
- }
-
- public void onComplete(T response) {
- status = Status.SUCCESS;
- result = response;
- sem.release();
- }
-
- public void onFailure(RemoteException error) {
- status = Status.FAILURE;
- result = null;
- err = error;
- sem.release();
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- if (!didGetResponse()) {
- sem.acquire();
- }
-
- if (isFailure()) {
- throw err;
- }
- return result;
- }
-
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException, TimeoutException {
- if (!didGetResponse()) {
- if (sem.tryAcquire(timeout, unit)) {
- if (isFailure()) {
- throw err;
- }
- return result;
- } else {
- throw new TimeoutException();
- }
- }
- return result;
- }
-
- public boolean didGetResponse() {
- return (status != Status.READY);
- }
-
- public boolean isSuccess() {
- return (status == Status.SUCCESS);
- }
-
- public boolean isFailure() {
- return (status == Status.FAILURE);
- }
-
- public String getErrorMessage() {
- if (status == Status.SUCCESS) {
- return "";
- }
- return err.getMessage();
- }
-
- @Override
- public boolean cancel(boolean arg0) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return sem.availablePermits() > 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java
deleted file mode 100644
index c8ea055..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Invocation;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Response;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@Deprecated
-public class NettyAsyncRpcProxy extends NettyClientBase {
- private static Log LOG = LogFactory.getLog(NettyAsyncRpcProxy.class);
-
- private final Class<?> protocol;
- private final ClientHandler handler;
- InetSocketAddress addr;
-
- private final AtomicInteger sequence = new AtomicInteger(0);
- private final Map<Integer, ResponseRpcCallback> requests =
- new ConcurrentHashMap<Integer, ResponseRpcCallback>();
-
- @SuppressWarnings("rawtypes")
- private final Map<String, Class> returnTypeMap = new HashMap<String, Class>();
-
- public NettyAsyncRpcProxy(Class<?> server, Class<?> client,
- InetSocketAddress addr) {
- this.protocol = client;
-
- this.handler = new ClientHandler();
- ChannelPipelineFactory pipeFactory =
- new ProtoPipelineFactory(handler, Response.getDefaultInstance());
-
- for (Method method : server.getMethods()) {
- returnTypeMap.put(method.getName(), method.getReturnType());
- }
-
- super.init(addr, pipeFactory);
- }
-
- public Object getProxy() {
- return Proxy.newProxyInstance(protocol.getClassLoader(),
- new Class[] { protocol }, new Invoker(getChannel()));
- }
-
- public class Invoker implements InvocationHandler {
- private final Channel channel;
-
- public Invoker(Channel channel) {
- this.channel = channel;
- }
-
- @SuppressWarnings("rawtypes")
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
-
- int seqId = sequence.incrementAndGet();
- Invocation.Builder builder = Invocation.newBuilder();
-
- if (args != null) {
- for (int i = 1; i < args.length; i++) {
- ByteString str = ((Message) args[i]).toByteString();
- builder.addParam(str);
- }
- }
- Callback userCallBack = (Callback) args[0];
- ResponseRpcCallback rpcCallback =
- new ResponseRpcCallback(returnTypeMap.get(method.getName()),
- userCallBack);
-
- Invocation request =
- builder.setId(seqId).setMethodName(method.getName()).build();
- requests.put(seqId, rpcCallback);
- this.channel.write(request);
- return null;
- }
-
- public void shutdown() {
- LOG.info("[RPC] Client terminates connection "
- + channel.getRemoteAddress());
- this.channel.close().awaitUninterruptibly();
- bootstrap.releaseExternalResources();
- }
- }
-
- private class ResponseRpcCallback implements RpcCallback<Response> {
- @SuppressWarnings("rawtypes")
- private final Callback callback;
- private Response response;
- private Class<?> retType;
-
- @SuppressWarnings("rawtypes")
- public ResponseRpcCallback(Class clazz, Callback callback) {
- this.callback = callback;
- this.retType = clazz;
- }
-
- @SuppressWarnings("unchecked")
- public void run(Response message) {
- response = message;
-
- Object retObj = null;
- if (response != null) {
- if (!response.getHasReturn()) {
- if (response.hasExceptionMessage()) {
- callback.onFailure(new RemoteException(response
- .getExceptionMessage()));
- return;
- }
- retObj = null;
- } else {
- try {
- Method mtd =
- retType
- .getMethod("parseFrom", new Class[] { ByteString.class });
- retObj = mtd.invoke(null, response.getReturnValue());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- callback.onComplete(retObj);
- }
- }
-
- private class ClientHandler extends SimpleChannelUpstreamHandler {
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- Response response = (Response) e.getMessage();
- ResponseRpcCallback callback = requests.remove(response.getId());
-
- if (callback == null) {
- LOG.debug("dangling rpc call");
- } else {
- callback.run(response);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- LOG.error("[RPC] ERROR " + e.getChannel().getRemoteAddress() + " "
- + e.getCause());
- e.getChannel().close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java
deleted file mode 100644
index 8954048..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Invocation;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Response;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@Deprecated
-public class NettyBlockingRpcProxy extends NettyClientBase {
-
- private static Log LOG = LogFactory.getLog(NettyBlockingRpcProxy.class);
-
- private final Class<?> protocol;
- private final ChannelPipelineFactory pipeFactory;
- private final ClientHandler handler;
- private final AtomicInteger sequence = new AtomicInteger(0);
- private Map<Integer, CallFuture> requests =
- new ConcurrentHashMap<Integer, CallFuture>();
-
- public NettyBlockingRpcProxy(Class<?> protocol, InetSocketAddress addr) {
- this.protocol = protocol;
- this.handler = new ClientHandler();
- this.pipeFactory =
- new ProtoPipelineFactory(handler, Response.getDefaultInstance());
- super.init(addr, pipeFactory);
- }
-
- public Object getProxy() {
- return Proxy.newProxyInstance(protocol.getClassLoader(),
- new Class[] { protocol }, new Invoker(getChannel()));
- }
-
- public String getExceptionMessage() {
- return handler.getExceptionMessage();
- }
-
- public class Invoker implements InvocationHandler {
- private final Channel channel;
-
- public Invoker(Channel channel) {
- this.channel = channel;
- }
-
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
-
- int nextSeqId = sequence.incrementAndGet();
-
- Invocation.Builder builder = Invocation.newBuilder();
-
- if (args != null) {
- for (int i = 0; i < args.length; i++) {
- ByteString str = ((Message) args[i]).toByteString();
- builder.addParam(str);
- }
- }
-
- Invocation request =
- builder.setId(nextSeqId).setMethodName(method.getName()).build();
-
- CallFuture callFuture = new CallFuture(method.getReturnType());
- requests.put(nextSeqId, callFuture);
- this.channel.write(request);
- Object retObj = callFuture.get();
- String exceptionMessage = handler.getExceptionMessage();
-
- if (exceptionMessage == "") {
- return retObj;
- } else {
- throw new RemoteException(exceptionMessage);
- }
- }
-
- public void shutdown() {
- LOG.info("[RPC] Client terminates connection "
- + channel.getRemoteAddress());
- this.channel.close().awaitUninterruptibly();
- bootstrap.releaseExternalResources();
- }
- }
-
- private class ClientHandler extends SimpleChannelUpstreamHandler {
- private String exceptionMessage = "";
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- Response response = (Response) e.getMessage();
- CallFuture callFuture = requests.get(response.getId());
-
- Object r = null;
- if (response != null) {
- if (!response.getHasReturn()) {
- if (response.hasExceptionMessage()) {
- this.exceptionMessage = response.getExceptionMessage();
- }
- response = null;
- } else {
- @SuppressWarnings("unchecked")
- Method mtd =
- callFuture.getReturnType().getMethod("parseFrom",
- new Class[] { ByteString.class });
- r = mtd.invoke(null, response.getReturnValue());
-
- }
- }
-
- if (callFuture == null) {
- LOG.debug("dangling rpc call");
- } else {
- callFuture.setResponse(r);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- LOG.error("[RPC] ERROR " + e.getChannel().getRemoteAddress() + " "
- + e.getCause());
- e.getChannel().close();
- }
-
- public String getExceptionMessage() {
- return this.exceptionMessage;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java
deleted file mode 100644
index a0fd9a2..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.net.InetSocketAddress;
-
-public class NettyRpc {
- @Deprecated
- public static NettyRpcServer getProtoParamRpcServer(Object instance,
- Class<?> interfaceClass, InetSocketAddress addr) {
- return new NettyRpcServer(instance, interfaceClass, addr);
- }
-
- @Deprecated
- public static Object getProtoParamAsyncRpcProxy(Class<?> serverClass,
- Class<?> clientClass, InetSocketAddress addr) {
- return new NettyAsyncRpcProxy(serverClass, clientClass, addr)
- .getProxy();
- }
-
- @Deprecated
- public static Object getProtoParamBlockingRpcProxy(Class<?> protocol,
- InetSocketAddress addr) {
- return new NettyBlockingRpcProxy(protocol, addr).getProxy();
- }
-}