You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/16 13:59:40 UTC

[1/2] TAJO-257: Unit tests occassionally fail. (hyunsik)

Updated Branches:
  refs/heads/master 202aa29b7 -> fd60ec395


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java
deleted file mode 100644
index 26541a4..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcServer.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Invocation;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Response;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-@Deprecated
-public class NettyRpcServer extends NettyServerBase {
-  private static Log LOG = LogFactory.getLog(NettyRpcServer.class);
-  private final Object instance;
-  private final Class<?> clazz;
-  private final ChannelPipelineFactory pipeline;
-  private Map<String, Method> methods;
-  private Map<String, Method> builderMethods;
-
-  @Deprecated
-  public NettyRpcServer(Object proxy, InetSocketAddress bindAddress) {
-    super(bindAddress);
-    this.instance = proxy;
-    this.clazz = instance.getClass();
-    this.methods = new HashMap<String, Method>();
-    this.builderMethods = new HashMap<String, Method>();
-    this.pipeline =
-        new ProtoPipelineFactory(new ServerHandler(),
-            Invocation.getDefaultInstance());
-
-    super.init(this.pipeline);
-    for (Method m : this.clazz.getDeclaredMethods()) {
-      String methodName = m.getName();
-      Class<?> params[] = m.getParameterTypes();
-      try {
-        methods.put(methodName, m);
-
-        Method mtd = params[0].getMethod("newBuilder", new Class[] {});
-        builderMethods.put(methodName, mtd);
-      } catch (Exception e) {
-        e.printStackTrace();
-        continue;
-      }
-    }
-  }
-
-  public NettyRpcServer(Object proxy, Class<?> interfaceClass,
-                        InetSocketAddress bindAddress) {
-    super(bindAddress);
-    this.instance = proxy;
-    this.clazz = instance.getClass();
-    this.methods = new HashMap<String, Method>();
-    this.builderMethods = new HashMap<String, Method>();
-    this.pipeline =
-        new ProtoPipelineFactory(new ServerHandler(),
-            Invocation.getDefaultInstance());
-
-    super.init(this.pipeline);
-    for (Method m : interfaceClass.getDeclaredMethods()) {
-      String methodName = m.getName();
-      Class<?> params[] = m.getParameterTypes();
-      try {
-        methods.put(methodName, this.clazz.getMethod(methodName, params));
-
-        Method mtd = params[0].getMethod("newBuilder", new Class[] {});
-        builderMethods.put(methodName, mtd);
-      } catch (Exception e) {
-        e.printStackTrace();
-        continue;
-      }
-    }
-  }
-
-  private class ServerHandler extends SimpleChannelUpstreamHandler {
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-      final Invocation request = (Invocation) e.getMessage();
-      Object res = null;
-      Response response = null;
-
-      try {
-        String methodName = request.getMethodName();
-        if (methods.containsKey(methodName) == false) {
-          throw new NoSuchMethodException(methodName);
-        }
-
-        Method method = methods.get(methodName);
-        Method builderGenMethod = builderMethods.get(methodName);
-        Builder builder =
-            (Builder) builderGenMethod.invoke(null, new Object[] {});
-        Message msg = builder.mergeFrom(request.getParam(0)).build();
-        res = method.invoke(instance, msg);
-
-      } catch (InvocationTargetException internalException) {
-        LOG.error(ExceptionUtils.getStackTrace(internalException
-            .getTargetException()));
-        response =
-            Response
-                .newBuilder()
-                .setId(request.getId())
-                .setHasReturn(false)
-                .setExceptionMessage(
-                    internalException.getTargetException().toString()).build();
-        e.getChannel().write(response);
-        return;
-      } catch (Exception otherException) {
-        otherException.printStackTrace();
-        response =
-            Response.newBuilder().setId(request.getId()).setHasReturn(false)
-                .setExceptionMessage(otherException.toString()).build();
-        e.getChannel().write(response);
-        return;
-      }
-
-      if (res == null) {
-        response =
-            Response.newBuilder().setId(request.getId()).setHasReturn(false)
-                .build();
-      } else {
-        ByteString str = ((Message) res).toByteString();
-        response =
-            Response.newBuilder().setId(request.getId()).setHasReturn(true)
-                .setReturnValue(str).build();
-      }
-      e.getChannel().write(response);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      e.getChannel().close();
-      LOG.error(e.getCause());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
deleted file mode 100644
index d78e4e1..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcClient.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
-import org.jboss.netty.channel.*;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ProtoAsyncRpcClient extends NettyClientBase {
-  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
-
-  private final ChannelUpstreamHandler handler;
-  private final ChannelPipelineFactory pipeFactory;
-  private final ProxyRpcChannel rpcChannel;
-
-  private final AtomicInteger sequence = new AtomicInteger(0);
-  private final Map<Integer, ResponseCallback> requests =
-      new ConcurrentHashMap<Integer, ResponseCallback>();
-
-  private final Class<?> protocol;
-  private final Method stubMethod;
-
-  public ProtoAsyncRpcClient(final Class<?> protocol,
-                             final InetSocketAddress addr)
-      throws Exception {
-
-    this.protocol = protocol;
-    String serviceClassName = protocol.getName() + "$"
-        + protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
-
-    this.handler = new ClientChannelUpstreamHandler();
-    pipeFactory = new ProtoPipelineFactory(handler,
-        RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory);
-    rpcChannel = new ProxyRpcChannel(getChannel());
-  }
-
-  public <T> T getStub() throws Exception {
-    return (T) stubMethod.invoke(null, rpcChannel);
-  }
-
-  public RpcChannel getRpcChannel() {
-    return this.rpcChannel;
-  }
-
-  private class ProxyRpcChannel implements RpcChannel {
-    private final Channel channel;
-    private final ClientChannelUpstreamHandler handler;
-
-    public ProxyRpcChannel(Channel channel) {
-      this.channel = channel;
-      this.handler = channel.getPipeline()
-          .get(ClientChannelUpstreamHandler.class);
-
-      if (handler == null) {
-        throw new IllegalArgumentException("Channel does not have " +
-            "proper handler");
-      }
-    }
-
-    public void callMethod(final MethodDescriptor method,
-                           final RpcController controller,
-                           final Message param,
-                           final Message responseType,
-                           RpcCallback<Message> done) {
-
-      int nextSeqId = sequence.getAndIncrement();
-
-      Message rpcRequest = buildRequest(nextSeqId, method, param);
-
-      handler.registerCallback(nextSeqId,
-          new ResponseCallback(controller, responseType, done));
-      channel.write(rpcRequest);
-    }
-
-    private Message buildRequest(int seqId,
-                                 MethodDescriptor method,
-                                 Message param) {
-
-      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
-          .setId(seqId)
-          .setMethodName(method.getName());
-
-      if (param != null) {
-          requestBuilder.setRequestMessage(param.toByteString());
-      }
-
-      return requestBuilder.build();
-    }
-  }
-
-  private class ResponseCallback implements RpcCallback<RpcResponse> {
-    private final RpcController controller;
-    private final Message responsePrototype;
-    private final RpcCallback<Message> callback;
-
-    public ResponseCallback(RpcController controller,
-                            Message responsePrototype,
-                            RpcCallback<Message> callback) {
-      this.controller = controller;
-      this.responsePrototype = responsePrototype;
-      this.callback = callback;
-    }
-
-    public void run(RpcResponse rpcResponse) {
-
-      // if hasErrorMessage is true, it means rpc-level errors.
-      // it does not call the callback function
-      if (rpcResponse.hasErrorMessage()) {
-
-        if (controller != null) {
-          this.controller.setFailed(rpcResponse.getErrorMessage());
-        }
-        callback.run(null);
-        throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
-
-      } else { // if rpc call succeed
-
-        try {
-          Message responseMessage;
-          if (!rpcResponse.hasResponseMessage()) {
-            responseMessage = null;
-          } else {
-            responseMessage = responsePrototype.newBuilderForType().mergeFrom(
-                rpcResponse.getResponseMessage()).build();
-          }
-
-          callback.run(responseMessage);
-
-        } catch (InvalidProtocolBufferException e) {
-          throw new RemoteException(getErrorMessage(""), e);
-        }
-      }
-    }
-  }
-
-  private String getErrorMessage(String message) {
-    return "Exception [" + protocol.getCanonicalName() +
-        "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
-        getChannel().getRemoteAddress()) + ")]: " + message;
-  }
-
-  private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
-
-    synchronized void registerCallback(int seqId, ResponseCallback callback) {
-
-      if (requests.containsKey(seqId)) {
-        throw new RemoteException(
-            getErrorMessage("Duplicate Sequence Id "+ seqId));
-      }
-
-      requests.put(seqId, callback);
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-
-      RpcResponse response = (RpcResponse) e.getMessage();
-      ResponseCallback callback = requests.remove(response.getId());
-
-      if (callback == null) {
-        LOG.warn("Dangling rpc call");
-      } else {
-        callback.run(response);
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      LOG.error(addr + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
-      e.getChannel().close();
-      throw new RemoteException(getErrorMessage(addr.toString()), e.getCause());
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
deleted file mode 100644
index 1a6ce29..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoAsyncRpcServer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-public class ProtoAsyncRpcServer extends NettyServerBase {
-  private static final Log LOG = LogFactory.getLog(ProtoAsyncRpcServer.class);
-
-  private final Service service;
-  private final ChannelPipelineFactory pipeline;
-
-  public ProtoAsyncRpcServer(final Class<?> protocol,
-                             final Object instance,
-                             final InetSocketAddress bindAddress)
-      throws Exception {
-    super(protocol.getSimpleName(), bindAddress);
-
-    String serviceClassName = protocol.getName() + "$" +
-        protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
-    Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
-    this.service = (Service) method.invoke(null, instance);
-
-    ServerHandler handler = new ServerHandler();
-    this.pipeline = new ProtoPipelineFactory(handler,
-        RpcRequest.getDefaultInstance());
-    super.init(this.pipeline);
-  }
-
-  private class ServerHandler extends SimpleChannelUpstreamHandler {
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-
-      final RpcRequest request = (RpcRequest) e.getMessage();
-
-      String methodName = request.getMethodName();
-      MethodDescriptor methodDescriptor = service.getDescriptorForType().
-          findMethodByName(methodName);
-
-      if (methodDescriptor == null) {
-        throw new RemoteCallException(request.getId(),
-            new NoSuchMethodException(methodName));
-      }
-
-      Message paramProto = null;
-      if (request.hasRequestMessage()) {
-        try {
-          paramProto = service.getRequestPrototype(methodDescriptor)
-                  .newBuilderForType().mergeFrom(request.getRequestMessage()).
-                  build();
-        } catch (Throwable t) {
-          throw new RemoteCallException(request.getId(), methodDescriptor, t);
-        }
-      }
-
-      final Channel channel = e.getChannel();
-      final RpcController controller = new NettyRpcController();
-
-      RpcCallback<Message> callback =
-          !request.hasId() ? null : new RpcCallback<Message>() {
-
-        public void run(Message returnValue) {
-
-          RpcResponse.Builder builder = RpcResponse.newBuilder()
-              .setId(request.getId());
-
-          if (returnValue != null) {
-            builder.setResponseMessage(returnValue.toByteString());
-          }
-
-          if (controller.failed()) {
-            builder.setErrorMessage(controller.errorText());
-          }
-
-          channel.write(builder.build());
-        }
-      };
-
-      service.callMethod(methodDescriptor, controller, paramProto, callback);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception{
-      if (e.getCause() instanceof RemoteCallException) {
-        RemoteCallException callException = (RemoteCallException) e.getCause();
-        e.getChannel().write(callException.getResponse());
-      }
-      throw new RemoteException(e.getCause());
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
deleted file mode 100644
index 2a0b01f..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcClient.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.apache.tajo.util.NetUtils;
-import org.jboss.netty.channel.*;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ProtoBlockingRpcClient extends NettyClientBase {
-  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
-
-  private final ChannelUpstreamHandler handler;
-  private final ChannelPipelineFactory pipeFactory;
-  private final ProxyRpcChannel rpcChannel;
-
-  private final AtomicInteger sequence = new AtomicInteger(0);
-  private final Map<Integer, ProtoCallFuture> requests =
-      new ConcurrentHashMap<Integer, ProtoCallFuture>();
-
-  private final Class<?> protocol;
-  private final Method stubMethod;
-
-  public ProtoBlockingRpcClient(final Class<?> protocol,
-                                final InetSocketAddress addr)
-      throws Exception {
-
-    this.protocol = protocol;
-    String serviceClassName = protocol.getName() + "$"
-        + protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    stubMethod = serviceClass.getMethod("newBlockingStub",
-        BlockingRpcChannel.class);
-
-    this.handler = new ClientChannelUpstreamHandler();
-    pipeFactory = new ProtoPipelineFactory(handler,
-        RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory);
-    rpcChannel = new ProxyRpcChannel(getChannel());
-  }
-
-  public <T> T getStub() throws Exception {
-    return (T) stubMethod.invoke(null, rpcChannel);
-  }
-
-  public BlockingRpcChannel getBlockingRpcChannel() {
-    return this.rpcChannel;
-  }
-
-  private class ProxyRpcChannel implements BlockingRpcChannel {
-    private final Channel channel;
-    private final ClientChannelUpstreamHandler handler;
-
-    public ProxyRpcChannel(Channel channel) {
-      this.channel = channel;
-      this.handler = channel.getPipeline().
-          get(ClientChannelUpstreamHandler.class);
-
-      if (handler == null) {
-        throw new IllegalArgumentException("Channel does not have " +
-            "proper handler");
-      }
-    }
-
-    public Message callBlockingMethod(final MethodDescriptor method,
-                                      final RpcController controller,
-                                      final Message param,
-                                      final Message responsePrototype)
-        throws ServiceException {
-
-      int nextSeqId = sequence.getAndIncrement();
-
-      Message rpcRequest = buildRequest(nextSeqId, method, param);
-
-      ProtoCallFuture callFuture =
-          new ProtoCallFuture(controller, responsePrototype);
-      requests.put(nextSeqId, callFuture);
-      channel.write(rpcRequest);
-
-      try {
-        return callFuture.get();
-      } catch (Throwable t) {
-        throw new RemoteException(t);
-      }
-    }
-
-    private Message buildRequest(int seqId,
-                                 MethodDescriptor method,
-                                 Message param) {
-      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
-          .setId(seqId)
-          .setMethodName(method.getName());
-
-      if (param != null) {
-        requestBuilder.setRequestMessage(param.toByteString());
-      }
-
-      return requestBuilder.build();
-    }
-  }
-
-  private String getErrorMessage(String message) {
-    if(protocol != null && getChannel() != null) {
-      return "Exception [" + protocol.getCanonicalName() +
-          "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
-          getChannel().getRemoteAddress()) + ")]: " + message;
-    } else {
-      return "Exception " + message;
-    }
-  }
-
-  private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-
-      RpcResponse rpcResponse = (RpcResponse) e.getMessage();
-      ProtoCallFuture callback = requests.remove(rpcResponse.getId());
-
-      if (callback == null) {
-        LOG.warn("Dangling rpc call");
-      } else {
-        if (rpcResponse.hasErrorMessage()) {
-          if (callback.controller != null) {
-            callback.setFailed(rpcResponse.getErrorMessage());
-          }
-          throw new RemoteException(
-              getErrorMessage(rpcResponse.getErrorMessage()));
-        } else {
-          Message responseMessage;
-
-          if (!rpcResponse.hasResponseMessage()) {
-            responseMessage = null;
-          } else {
-            responseMessage =
-                callback.returnType.newBuilderForType().
-                    mergeFrom(rpcResponse.getResponseMessage()).build();
-          }
-
-          callback.setResponse(responseMessage);
-        }
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      e.getChannel().close();
-      throw new RemoteException(getErrorMessage(""), e.getCause());
-    }
-  }
-
-  class ProtoCallFuture implements Future<Message> {
-    private Semaphore sem = new Semaphore(0);
-    private Message response = null;
-    private Message returnType;
-
-    private RpcController controller;
-
-    public ProtoCallFuture(RpcController controller, Message message) {
-      this.controller = controller;
-      this.returnType = message;
-    }
-
-    @Override
-    public boolean cancel(boolean arg0) {
-      return false;
-    }
-
-    @Override
-    public Message get() throws InterruptedException, ExecutionException {
-      sem.acquire();
-      return response;
-    }
-
-    @Override
-    public Message get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      if(sem.tryAcquire(timeout, unit)) {
-        return response;
-      } else {
-        throw new TimeoutException();
-      }
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public boolean isDone() {
-      return sem.availablePermits() > 0;
-    }
-
-    public void setResponse(Message response) {
-      this.response = response;
-      sem.release();
-    }
-
-    public void setFailed(String errorText) {
-      this.controller.setFailed(errorText);
-      sem.release();
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
deleted file mode 100644
index cc47b7b..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoBlockingRpcServer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-public class ProtoBlockingRpcServer extends NettyServerBase {
-  private static Log LOG = LogFactory.getLog(ProtoBlockingRpcServer.class);
-  private final BlockingService service;
-  private final ChannelPipelineFactory pipeline;
-
-  public ProtoBlockingRpcServer(final Class<?> protocol,
-                                final Object instance,
-                                final InetSocketAddress bindAddress)
-      throws Exception {
-
-    super(protocol.getSimpleName(), bindAddress);
-
-    String serviceClassName = protocol.getName() + "$" +
-        protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    Class<?> interfaceClass = Class.forName(serviceClassName +
-        "$BlockingInterface");
-    Method method = serviceClass.getMethod(
-        "newReflectiveBlockingService", interfaceClass);
-
-    this.service = (BlockingService) method.invoke(null, instance);
-    this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
-        RpcRequest.getDefaultInstance());
-
-    super.init(this.pipeline);
-  }
-
-  private class ServerHandler extends SimpleChannelUpstreamHandler {
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-      final RpcRequest request = (RpcRequest) e.getMessage();
-
-      String methodName = request.getMethodName();
-      MethodDescriptor methodDescriptor =
-          service.getDescriptorForType().findMethodByName(methodName);
-
-      if (methodDescriptor == null) {
-        throw new RemoteCallException(request.getId(),
-            new NoSuchMethodException(methodName));
-      }
-      Message paramProto = null;
-      if (request.hasRequestMessage()) {
-        try {
-          paramProto = service.getRequestPrototype(methodDescriptor)
-              .newBuilderForType().mergeFrom(request.getRequestMessage()).
-                  build();
-
-        } catch (Throwable t) {
-          throw new RemoteCallException(request.getId(), methodDescriptor, t);
-        }
-      }
-      Message returnValue;
-      RpcController controller = new NettyRpcController();
-
-      try {
-        returnValue = service.callBlockingMethod(methodDescriptor,
-            controller, paramProto);
-      } catch (Throwable t) {
-        throw new RemoteCallException(request.getId(), methodDescriptor, t);
-      }
-
-      RpcResponse.Builder builder =
-          RpcResponse.newBuilder().setId(request.getId());
-
-      if (returnValue != null) {
-        builder.setResponseMessage(returnValue.toByteString());
-      }
-
-      if (controller.failed()) {
-        builder.setErrorMessage(controller.errorText());
-      }
-      e.getChannel().write(builder.build());
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      if (e.getCause() instanceof RemoteCallException) {
-        RemoteCallException callException = (RemoteCallException) e.getCause();
-        e.getChannel().write(callException.getResponse());
-      }
-
-      throw new RemoteException(e.getCause());
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java
deleted file mode 100644
index f38ab9d..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkAsyncRPC.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.benchmark;
-
-import org.apache.tajo.rpc.Callback;
-import org.apache.tajo.rpc.NettyRpc;
-import org.apache.tajo.rpc.NettyRpcServer;
-import org.apache.tajo.rpc.RemoteException;
-
-import java.net.InetSocketAddress;
-
-import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-
-public class BenchmarkAsyncRPC {
-
-  public static class ClientWrapper extends Thread {
-    public void run() {
-      BenchmarkClientInterface service;
-      service =
-          (BenchmarkClientInterface) NettyRpc.getProtoParamAsyncRpcProxy(
-              BenchmarkServerInterface.class, BenchmarkClientInterface.class,
-              new InetSocketAddress(15010));
-
-      long start = System.currentTimeMillis();
-      Callback<StringProto> cb = new Callback<StringProto>();
-      StringProto ps = StringProto.newBuilder().setValue("ABCD").build();
-      for (int i = 0; i < 100000; i++) {
-        service.shoot(cb, ps);
-      }
-      long end = System.currentTimeMillis();
-
-      System.out.println("elapsed time: " + (end - start) + "msc");
-    }
-  }
-
-  public static interface BenchmarkClientInterface {
-    public void shoot(Callback<StringProto> ret, StringProto l)
-        throws RemoteException;
-  }
-
-  public static interface BenchmarkServerInterface {
-    public StringProto shoot(StringProto l) throws RemoteException;
-  }
-
-  public static class BenchmarkImpl implements BenchmarkServerInterface {
-    @Override
-    public StringProto shoot(StringProto l) {
-      return l;
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    NettyRpcServer rpcServer =
-        NettyRpc.getProtoParamRpcServer(new BenchmarkImpl(),
-            BenchmarkServerInterface.class, new InetSocketAddress(15010));
-    rpcServer.start();
-    Thread.sleep(1000);
-
-    int numThreads = 1;
-    ClientWrapper client[] = new ClientWrapper[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      client[i] = new ClientWrapper();
-    }
-
-    for (int i = 0; i < numThreads; i++) {
-      client[i].start();
-    }
-
-    for (int i = 0; i < numThreads; i++) {
-      client[i].join();
-    }
-
-    rpcServer.shutdown();
-    System.exit(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java
deleted file mode 100644
index 5ca9bd0..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkBlockingRPC.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.benchmark;
-
-import org.apache.tajo.rpc.NettyRpc;
-import org.apache.tajo.rpc.NettyRpcServer;
-import org.apache.tajo.rpc.RemoteException;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-
-import java.net.InetSocketAddress;
-
-public class BenchmarkBlockingRPC {
-
-  public static class ClientWrapper extends Thread {
-    @SuppressWarnings("unused")
-    public void run() {
-      InetSocketAddress addr = new InetSocketAddress("localhost", 15001);
-      BenchmarkInterface proxy;
-      proxy =
-          (BenchmarkInterface) NettyRpc.getProtoParamBlockingRpcProxy(
-              BenchmarkInterface.class, addr);
-
-      long start = System.currentTimeMillis();
-      StringProto ps = StringProto.newBuilder().setValue("ABCD").build();
-      for (int i = 0; i < 100000; i++) {
-        try {
-          proxy.shoot(ps);
-        } catch (RemoteException e1) {
-          System.out.println(e1.getMessage());
-        }
-      }
-      long end = System.currentTimeMillis();
-      System.out.println("elapsed time: " + (end - start) + "msc");
-
-    }
-  }
-
-  public static interface BenchmarkInterface {
-    public StringProto shoot(StringProto l) throws RemoteException;
-  }
-
-  public static class BenchmarkImpl implements BenchmarkInterface {
-    @Override
-    public StringProto shoot(StringProto l) {
-      return l;
-    }
-  }
-
-  public static void main(String[] args) throws InterruptedException,
-      RemoteException {
-
-    NettyRpcServer server =
-        NettyRpc
-            .getProtoParamRpcServer(new BenchmarkImpl(),
-                BenchmarkInterface.class, new InetSocketAddress("localhost",
-                    15001));
-
-    server.start();
-    Thread.sleep(1000);
-
-    int numThreads = 1;
-    ClientWrapper client[] = new ClientWrapper[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      client[i] = new ClientWrapper();
-    }
-
-    for (int i = 0; i < numThreads; i++) {
-      client[i].start();
-    }
-
-    for (int i = 0; i < numThreads; i++) {
-      client[i].join();
-    }
-
-    server.shutdown();
-    System.exit(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java
deleted file mode 100644
index 6487fde..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/benchmark/BenchmarkHadoopRPC.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.benchmark;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.ipc.VersionedProtocol;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-public class BenchmarkHadoopRPC {
-
-  public static class ClientWrapper extends Thread {
-    @SuppressWarnings("unused")
-    public void run() {
-      InetSocketAddress addr = new InetSocketAddress("localhost", 15000);
-      BenchmarkInterface proxy = null;
-      try {
-        proxy =
-            RPC.waitForProxy(BenchmarkInterface.class, 1,
-                addr, new Configuration());
-      } catch (IOException e1) {
-        e1.printStackTrace();
-      }
-
-      long start = System.currentTimeMillis();
-      for (int i = 0; i < 100000; i++) {
-        proxy.shoot("ABCD");
-      }
-      long end = System.currentTimeMillis();
-      System.out.println("elapsed time: " + (end - start) + "msc");
-    }
-  }
-
-  public static interface BenchmarkInterface extends VersionedProtocol {
-    public String shoot(String l);
-  }
-
-  public static class BenchmarkImpl implements BenchmarkInterface {
-    @Override
-    public String shoot(String l) {
-      return l;
-    }
-
-    @Override
-    public long getProtocolVersion(String arg0, long arg1) throws IOException {
-      return 1l;
-    }
-
-    @Override
-    public ProtocolSignature getProtocolSignature(String protocol,
-                                                  long clientVersion, int clientMethodsHash) throws IOException {
-      ProtocolSignature ps = null;
-      try {
-        ps = ProtocolSignature.getProtocolSignature("benchmarkInterface", 0);
-      } catch (ClassNotFoundException e) {
-        e.printStackTrace();
-      }
-
-      return ps;
-    }
-  }
-
-  public static void main(String[] args) throws InterruptedException,
-      IOException {
-    Server server =
-        RPC.getServer(new BenchmarkImpl(), "localhost", 15000,
-            new Configuration());
-    server.start();
-    Thread.sleep(1000);
-
-    int numThreads = 1;
-    ClientWrapper client[] = new ClientWrapper[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      client[i] = new ClientWrapper();
-    }
-
-    for (int i = 0; i < numThreads; i++) {
-      client[i].start();
-    }
-
-    for (int i = 0; i < numThreads; i++) {
-      client[i].join();
-    }
-
-    server.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto b/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto
deleted file mode 100644
index f8cac2a..0000000
--- a/tajo-rpc/src/main/proto/ProtoParamRpcProtocol.proto
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc";
-option java_outer_classname = "ProtoParamRpcProtocol";
-option java_generic_services = true;
-
-import "ProtoParamRpcProtos.proto";
-
-service ProtoParamRpcService {
-	rpc invoke (Invocation) returns (Response);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto b/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto
deleted file mode 100644
index 5cde52f..0000000
--- a/tajo-rpc/src/main/proto/ProtoParamRpcProtos.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc";
-option java_outer_classname = "ProtoParamRpcProtos";
-
-message Invocation {
-    required int32 id = 1;
-    required string method_name = 2;
-    repeated bytes param = 3;
-}
-
-message Response {
-    required int32 id = 1;
-    required bool has_return = 2;
-    optional bytes return_value = 3;
-    optional string exception_message = 4;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
new file mode 100644
index 0000000..9ae883b
--- /dev/null
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.apache.tajo.util.NetUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+public class TestAsyncRpc {
+  private static Log LOG = LogFactory.getLog(TestAsyncRpc.class);
+  private static String MESSAGE = "TestAsyncRpc";
+
+  double sum;
+  String echo;
+
+  static AsyncRpcServer server;
+  static AsyncRpcClient client;
+  static Interface stub;
+  static DummyProtocolAsyncImpl service;
+
+  @Before
+  public void setUp() throws Exception {
+    service = new DummyProtocolAsyncImpl();
+    server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0));
+    server.start();
+    client = new AsyncRpcClient(DummyProtocol.class,
+        NetUtils.getConnectAddress(server.getListenAddress()));
+    stub = client.getStub();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if(client != null) {
+      client.close();
+    }
+    if(server != null) {
+      server.shutdown();
+    }
+  }
+
+  boolean calledMarker = false;
+  @Test
+  public void testRpc() throws Exception {
+
+    SumRequest sumRequest = SumRequest.newBuilder()
+        .setX1(1)
+        .setX2(2)
+        .setX3(3.15d)
+        .setX4(2.0f).build();
+
+    stub.sum(null, sumRequest, new RpcCallback<SumResponse>() {
+      @Override
+      public void run(SumResponse parameter) {
+        sum = parameter.getResult();
+        assertTrue(8.15d == sum);
+      }
+    });
+
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
+      @Override
+      public void run(EchoMessage parameter) {
+        echo = parameter.getMessage();
+        assertEquals(MESSAGE, echo);
+        calledMarker = true;
+      }
+    };
+    stub.echo(null, echoMessage, callback);
+    Thread.sleep(1000);
+    assertTrue(calledMarker);
+  }
+
+  private CountDownLatch testNullLatch;
+
+  @Test
+  public void testGetNull() throws Exception {
+    testNullLatch = new CountDownLatch(1);
+    stub.getNull(null, null, new RpcCallback<EchoMessage>() {
+      @Override
+      public void run(EchoMessage parameter) {
+        assertNull(parameter);
+        LOG.info("testGetNull retrieved");
+        testNullLatch.countDown();
+      }
+    });
+    testNullLatch.await(1000, TimeUnit.MILLISECONDS);
+    assertTrue(service.getNullCalled);
+  }
+
+  @Test
+  public void testCallFuture() throws Exception {
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.deley(null, echoMessage, future);
+
+    assertFalse(future.isDone());
+    assertEquals(future.get(), echoMessage);
+    assertTrue(future.isDone());
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void testCallFutureTimeout() throws Exception {
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.deley(null, echoMessage, future);
+
+    assertFalse(future.isDone());
+    assertEquals(future.get(1, TimeUnit.SECONDS), echoMessage);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
new file mode 100644
index 0000000..04bb13b
--- /dev/null
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
+import org.apache.tajo.util.NetUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class TestBlockingRpc {
+  public static String MESSAGE = "TestBlockingRpc";
+
+  private BlockingRpcServer server;
+  private BlockingRpcClient client;
+  private BlockingInterface stub;
+  private DummyProtocolBlockingImpl service;
+
+  @Before
+  public void setUp() throws Exception {
+    service = new DummyProtocolBlockingImpl();
+    server = new BlockingRpcServer(DummyProtocol.class, service,
+        new InetSocketAddress("127.0.0.1", 0));
+    server.start();
+    client = new BlockingRpcClient(DummyProtocol.class,
+        NetUtils.getConnectAddress(server.getListenAddress()));
+    stub = client.getStub();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if(client != null) {
+      client.close();
+    }
+    if(server != null) {
+      server.shutdown();
+    }
+  }
+
+  @Test
+  public void testRpc() throws Exception {
+    SumRequest request = SumRequest.newBuilder()
+        .setX1(1)
+        .setX2(2)
+        .setX3(3.15d)
+        .setX4(2.0f).build();
+    SumResponse response1 = stub.sum(null, request);
+    assertTrue(8.15d == response1.getResult());
+
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    EchoMessage response2 = stub.echo(null, message);
+    assertEquals(MESSAGE, response2.getMessage());
+  }
+
+  @Test
+  public void testGetNull() throws Exception {
+    assertNull(stub.getNull(null, null));
+    assertTrue(service.getNullCalled);
+  }
+
+  @Test
+  public void testShutdown() throws Exception {
+    final StringBuilder error = new StringBuilder();
+    Thread callThread = new Thread() {
+      public void run() {
+        try {
+          EchoMessage message = EchoMessage.newBuilder()
+              .setMessage(MESSAGE)
+              .build();
+          stub.deley(null, message);
+        } catch (Exception e) {
+          e.printStackTrace();
+          error.append(e.getMessage());
+        }
+        synchronized(error) {
+          error.notifyAll();
+        }
+      }
+    };
+
+    callThread.start();
+
+    final CountDownLatch latch = new CountDownLatch(1);
+    Thread shutdownThread = new Thread() {
+      public void run() {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        try {
+          server.shutdown();
+          server = null;
+          latch.countDown();
+        } catch (Throwable e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    shutdownThread.start();
+
+    latch.await(5 * 1000, TimeUnit.MILLISECONDS);
+
+    assertTrue(latch.getCount() == 0);
+
+    synchronized(error) {
+      error.wait(5 * 1000);
+    }
+
+    if(!error.toString().isEmpty()) {
+      fail(error.toString());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
deleted file mode 100644
index 0d01a49..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoAsyncRpc.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.apache.tajo.util.NetUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-
-public class TestProtoAsyncRpc {
-  private static Log LOG = LogFactory.getLog(TestProtoAsyncRpc.class);
-  private static String MESSAGE = "TestProtoAsyncRpc";
-
-  double sum;
-  String echo;
-
-  static ProtoAsyncRpcServer server;
-  static ProtoAsyncRpcClient client;
-  static Interface stub;
-  static DummyProtocolAsyncImpl service;
-
-  @Before
-  public void setUp() throws Exception {
-    service = new DummyProtocolAsyncImpl();
-    server = new ProtoAsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0));
-    server.start();
-    client = new ProtoAsyncRpcClient(DummyProtocol.class,
-        NetUtils.getConnectAddress(server.getListenAddress()));
-    stub = client.getStub();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if(client != null) {
-      client.close();
-    }
-    if(server != null) {
-      server.shutdown();
-    }
-  }
-
-  boolean calledMarker = false;
-  @Test
-  public void testRpc() throws Exception {
-
-    SumRequest sumRequest = SumRequest.newBuilder()
-        .setX1(1)
-        .setX2(2)
-        .setX3(3.15d)
-        .setX4(2.0f).build();
-
-    stub.sum(null, sumRequest, new RpcCallback<SumResponse>() {
-      @Override
-      public void run(SumResponse parameter) {
-        sum = parameter.getResult();
-        assertTrue(8.15d == sum);
-      }
-    });
-
-
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
-      @Override
-      public void run(EchoMessage parameter) {
-        echo = parameter.getMessage();
-        assertEquals(MESSAGE, echo);
-        calledMarker = true;
-      }
-    };
-    stub.echo(null, echoMessage, callback);
-    Thread.sleep(1000);
-    assertTrue(calledMarker);
-  }
-
-  private CountDownLatch testNullLatch;
-
-  @Test
-  public void testGetNull() throws Exception {
-    testNullLatch = new CountDownLatch(1);
-    stub.getNull(null, null, new RpcCallback<EchoMessage>() {
-      @Override
-      public void run(EchoMessage parameter) {
-        assertNull(parameter);
-        LOG.info("testGetNull retrieved");
-        testNullLatch.countDown();
-      }
-    });
-    testNullLatch.await(1000, TimeUnit.MILLISECONDS);
-    assertTrue(service.getNullCalled);
-  }
-
-  @Test
-  public void testCallFuture() throws Exception {
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    CallFuture2<EchoMessage> future = new CallFuture2<EchoMessage>();
-    stub.deley(null, echoMessage, future);
-
-    assertFalse(future.isDone());
-    assertEquals(future.get(), echoMessage);
-    assertTrue(future.isDone());
-  }
-
-  @Test(expected = TimeoutException.class)
-  public void testCallFutureTimeout() throws Exception {
-    EchoMessage echoMessage = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    CallFuture2<EchoMessage> future = new CallFuture2<EchoMessage>();
-    stub.deley(null, echoMessage, future);
-
-    assertFalse(future.isDone());
-    assertEquals(future.get(1, TimeUnit.SECONDS), echoMessage);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
deleted file mode 100644
index 732f7f1..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestProtoBlockingRpc.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.apache.tajo.util.NetUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-public class TestProtoBlockingRpc {
-  public static String MESSAGE = "TestProtoBlockingRpc";
-
-  private ProtoBlockingRpcServer server;
-  private ProtoBlockingRpcClient client;
-  private BlockingInterface stub;
-  private DummyProtocolBlockingImpl service;
-
-  @Before
-  public void setUp() throws Exception {
-    service = new DummyProtocolBlockingImpl();
-    server = new ProtoBlockingRpcServer(DummyProtocol.class, service,
-        new InetSocketAddress("127.0.0.1", 0));
-    server.start();
-    client = new ProtoBlockingRpcClient(DummyProtocol.class,
-        NetUtils.getConnectAddress(server.getListenAddress()));
-    stub = client.getStub();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if(client != null) {
-      client.close();
-    }
-    if(server != null) {
-      server.shutdown();
-    }
-  }
-
-  @Test
-  public void testRpc() throws Exception {
-    SumRequest request = SumRequest.newBuilder()
-        .setX1(1)
-        .setX2(2)
-        .setX3(3.15d)
-        .setX4(2.0f).build();
-    SumResponse response1 = stub.sum(null, request);
-    assertTrue(8.15d == response1.getResult());
-
-    EchoMessage message = EchoMessage.newBuilder()
-        .setMessage(MESSAGE).build();
-    EchoMessage response2 = stub.echo(null, message);
-    assertEquals(MESSAGE, response2.getMessage());
-  }
-
-  @Test
-  public void testGetNull() throws Exception {
-    assertNull(stub.getNull(null, null));
-    assertTrue(service.getNullCalled);
-  }
-
-  @Test
-  public void testShutdown() throws Exception {
-    final StringBuilder error = new StringBuilder();
-    Thread callThread = new Thread() {
-      public void run() {
-        try {
-          EchoMessage message = EchoMessage.newBuilder()
-              .setMessage(MESSAGE)
-              .build();
-          stub.deley(null, message);
-        } catch (Exception e) {
-          e.printStackTrace();
-          error.append(e.getMessage());
-        }
-        synchronized(error) {
-          error.notifyAll();
-        }
-      }
-    };
-
-    callThread.start();
-
-    final CountDownLatch latch = new CountDownLatch(1);
-    Thread shutdownThread = new Thread() {
-      public void run() {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-        }
-        try {
-          server.shutdown();
-          server = null;
-          latch.countDown();
-        } catch (Throwable e) {
-          e.printStackTrace();
-        }
-      }
-    };
-    shutdownThread.start();
-
-    latch.await(5 * 1000, TimeUnit.MILLISECONDS);
-
-    assertTrue(latch.getCount() == 0);
-
-    synchronized(error) {
-      error.wait(5 * 1000);
-    }
-
-    if(!error.toString().isEmpty()) {
-      fail(error.toString());
-    }
-  }
-}
\ No newline at end of file


[2/2] git commit: TAJO-257: Unit tests occassionally fail. (hyunsik)

Posted by hy...@apache.org.
TAJO-257: Unit tests occassionally fail. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fd60ec39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fd60ec39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fd60ec39

Branch: refs/heads/master
Commit: fd60ec395b951764caaed8a8ebe5bf5bef1eb96f
Parents: 202aa29
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Oct 16 20:59:17 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Oct 16 20:59:17 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tajo/catalog/CatalogClient.java  |   6 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   6 +-
 .../java/org/apache/tajo/client/TajoClient.java |  14 +-
 .../tajo/engine/json/FragmentDeserializer.java  |  54 -----
 .../apache/tajo/master/TajoContainerProxy.java  |  10 +-
 .../tajo/master/TajoMasterClientService.java    |   6 +-
 .../apache/tajo/master/TajoMasterService.java   |   6 +-
 .../master/querymaster/QueryInProgress.java     |   6 +-
 .../tajo/master/querymaster/QueryMaster.java    |   9 +-
 .../master/querymaster/QueryMasterTask.java     |   8 +-
 .../tajo/worker/TajoResourceAllocator.java      |   6 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  12 +-
 .../tajo/worker/TajoWorkerClientService.java    |   6 +-
 .../tajo/worker/TajoWorkerManagerService.java   |   6 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  17 +-
 tajo-rpc/pom.xml                                |   2 -
 .../org/apache/tajo/rpc/AsyncRpcClient.java     | 206 ++++++++++++++++
 .../org/apache/tajo/rpc/AsyncRpcServer.java     | 124 ++++++++++
 .../org/apache/tajo/rpc/BlockingRpcClient.java  | 236 +++++++++++++++++++
 .../org/apache/tajo/rpc/BlockingRpcServer.java  | 120 ++++++++++
 .../java/org/apache/tajo/rpc/CallFuture.java    |  65 +++--
 .../java/org/apache/tajo/rpc/CallFuture2.java   |  74 ------
 .../main/java/org/apache/tajo/rpc/Callback.java | 112 ---------
 .../org/apache/tajo/rpc/NettyAsyncRpcProxy.java | 177 --------------
 .../apache/tajo/rpc/NettyBlockingRpcProxy.java  | 157 ------------
 .../main/java/org/apache/tajo/rpc/NettyRpc.java |  42 ----
 .../org/apache/tajo/rpc/NettyRpcServer.java     | 161 -------------
 .../apache/tajo/rpc/ProtoAsyncRpcClient.java    | 206 ----------------
 .../apache/tajo/rpc/ProtoAsyncRpcServer.java    | 124 ----------
 .../apache/tajo/rpc/ProtoBlockingRpcClient.java | 236 -------------------
 .../apache/tajo/rpc/ProtoBlockingRpcServer.java | 120 ----------
 .../tajo/rpc/benchmark/BenchmarkAsyncRPC.java   |  92 --------
 .../rpc/benchmark/BenchmarkBlockingRPC.java     |  94 --------
 .../tajo/rpc/benchmark/BenchmarkHadoopRPC.java  | 107 ---------
 .../src/main/proto/ProtoParamRpcProtocol.proto  |  25 --
 .../src/main/proto/ProtoParamRpcProtos.proto    |  31 ---
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  | 148 ++++++++++++
 .../org/apache/tajo/rpc/TestBlockingRpc.java    | 141 +++++++++++
 .../org/apache/tajo/rpc/TestProtoAsyncRpc.java  | 148 ------------
 .../apache/tajo/rpc/TestProtoBlockingRpc.java   | 141 -----------
 41 files changed, 1068 insertions(+), 2195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 074416f..433d782 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -201,6 +201,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-257: Unit tests occassionally fail. (hyunsik)
+
     TAJO-169: the default TAJO_WORKER_STANDBY_MODE in tajo-env.sh is wrong. 
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 5190998..6240631 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.rpc.ProtoBlockingRpcClient;
+import org.apache.tajo.rpc.BlockingRpcClient;
 import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
@@ -34,7 +34,7 @@ import java.net.InetSocketAddress;
  */
 public class CatalogClient extends AbstractCatalogClient {
   private final Log LOG = LogFactory.getLog(CatalogClient.class);
-  private ProtoBlockingRpcClient client;
+  private BlockingRpcClient client;
 
   /**
    * @throws java.io.IOException
@@ -53,7 +53,7 @@ public class CatalogClient extends AbstractCatalogClient {
     String addrStr = NetUtils.normalizeInetSocketAddress(serverAddr);
     LOG.info("Trying to connect the catalog (" + addrStr + ")");
     try {
-      client = new ProtoBlockingRpcClient(CatalogProtocol.class, serverAddr);
+      client = new BlockingRpcClient(CatalogProtocol.class, serverAddr);
       setStub((BlockingInterface) client.getStub());
     } catch (Exception e) {
       throw new IOException("Can't connect the catalog server (" + addrStr +")");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index f15d962..62096b1 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -33,7 +33,7 @@ import org.apache.tajo.catalog.store.DerbyStore;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
@@ -70,7 +70,7 @@ public class CatalogServer extends AbstractService {
       List<FunctionDescProto>>();
 
   // RPC variables
-  private ProtoBlockingRpcServer rpcServer;
+  private BlockingRpcServer rpcServer;
   private InetSocketAddress bindAddress;
   private String bindAddressStr;
   final CatalogProtocolHandler handler;
@@ -137,7 +137,7 @@ public class CatalogServer extends AbstractService {
     String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
     try {
-      this.rpcServer = new ProtoBlockingRpcServer(
+      this.rpcServer = new BlockingRpcServer(
           CatalogProtocol.class,
           handler, initIsa);
       this.rpcServer.start();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 324ced4..19fa618 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -36,7 +36,7 @@ import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
-import org.apache.tajo.rpc.ProtoBlockingRpcClient;
+import org.apache.tajo.rpc.BlockingRpcClient;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.NetUtils;
 
@@ -52,14 +52,14 @@ public class TajoClient {
   private final Log LOG = LogFactory.getLog(TajoClient.class);
 
   private final TajoConf conf;
-  private ProtoBlockingRpcClient tasjoMasterClient;
+  private BlockingRpcClient tasjoMasterClient;
   private TajoMasterClientProtocolService.BlockingInterface tajoMasterService;
 
   private Map<QueryId, QueryMasterClientProtocolService.BlockingInterface> queryMasterConnectionMap =
           new HashMap<QueryId, QueryMasterClientProtocolService.BlockingInterface>();
 
-  private Map<QueryId, ProtoBlockingRpcClient> queryMasterClientMap =
-          new HashMap<QueryId, ProtoBlockingRpcClient>();
+  private Map<QueryId, BlockingRpcClient> queryMasterClientMap =
+          new HashMap<QueryId, BlockingRpcClient>();
 
   public TajoClient(TajoConf conf) throws IOException {
     this.conf = conf;
@@ -81,7 +81,7 @@ public class TajoClient {
 
   private void connect(InetSocketAddress addr) throws IOException {
     try {
-      tasjoMasterClient = new ProtoBlockingRpcClient(TajoMasterClientProtocol.class, addr);
+      tasjoMasterClient = new BlockingRpcClient(TajoMasterClientProtocol.class, addr);
       tajoMasterService = tasjoMasterClient.getStub();
     } catch (Exception e) {
       throw new IOException(e);
@@ -91,7 +91,7 @@ public class TajoClient {
   public void close() {
     tasjoMasterClient.close();
 
-    for(ProtoBlockingRpcClient eachClient: queryMasterClientMap.values()) {
+    for(BlockingRpcClient eachClient: queryMasterClientMap.values()) {
       eachClient.close();
     }
     queryMasterClientMap.clear();
@@ -177,7 +177,7 @@ public class TajoClient {
   private void connectionToQueryMaster(QueryId queryId, String queryMasterHost, int queryMasterPort) {
     try {
       InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterHost, queryMasterPort);
-      ProtoBlockingRpcClient client = new ProtoBlockingRpcClient(QueryMasterClientProtocol.class, addr);
+      BlockingRpcClient client = new BlockingRpcClient(QueryMasterClientProtocol.class, addr);
       QueryMasterClientProtocolService.BlockingInterface service = client.getStub();
 
       queryMasterConnectionMap.put(queryId, service);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
deleted file mode 100644
index 63dd1fa..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.json;
-
-import com.google.gson.*;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMetaImpl;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.storage.Fragment;
-
-import java.lang.reflect.Type;
-
-public class FragmentDeserializer implements JsonDeserializer<Fragment> {
-
-	@Override
-	public Fragment deserialize(JsonElement json, Type type,
-			JsonDeserializationContext ctx) throws JsonParseException {
-		Gson gson = CoreGsonHelper.getInstance();
-		JsonObject fragObj = json.getAsJsonObject();
-		JsonObject metaObj = fragObj.get("meta").getAsJsonObject();
-		TableMetaImpl meta = new TableMetaImpl(
-		    gson.fromJson(metaObj.get("schema"), Schema.class),
-				gson.fromJson(metaObj.get("storeType"), StoreType.class), 
-				gson.fromJson(metaObj.get("options"), Options.class));
-		Fragment fragment = new Fragment(fragObj.get("tabletId").getAsString(), 
-				gson.fromJson(fragObj.get("path"), Path.class), 
-				meta, 
-				fragObj.get("startOffset").getAsLong(), 
-				fragObj.get("length").getAsLong());
-		return fragment;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 5f17c71..846343d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -30,8 +30,8 @@ import org.apache.tajo.master.event.QueryEventType;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -62,13 +62,13 @@ public class TajoContainerProxy extends ContainerProxy {
   }
 
   private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
-    ProtoAsyncRpcClient tajoWorkerRpc = null;
+    AsyncRpcClient tajoWorkerRpc = null;
     try {
       InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
           .getTajoWorkerManagerService().getBindAddr();
 
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
-      tajoWorkerRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+      tajoWorkerRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
 
       TajoWorkerProtocol.RunExecutionBlockRequestProto request =
@@ -93,8 +93,8 @@ public class TajoContainerProxy extends ContainerProxy {
   }
 
   class AyncRpcClose extends Thread {
-    ProtoAsyncRpcClient client;
-    public AyncRpcClose(ProtoAsyncRpcClient client) {
+    AsyncRpcClient client;
+    public AyncRpcClose(AsyncRpcClient client) {
       this.client = client;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index b6efd1d..38ac2a4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -45,7 +45,7 @@ import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryInfo;
 import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.RemoteException;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
@@ -61,7 +61,7 @@ public class TajoMasterClientService extends AbstractService {
   private final TajoConf conf;
   private final CatalogService catalog;
   private final TajoMasterClientProtocolServiceHandler clientHandler;
-  private ProtoBlockingRpcServer server;
+  private BlockingRpcServer server;
   private InetSocketAddress bindAddress;
 
   private final BoolProto BOOL_TRUE =
@@ -84,7 +84,7 @@ public class TajoMasterClientService extends AbstractService {
     String confClientServiceAddr = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
     try {
-      server = new ProtoBlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
+      server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
     } catch (Exception e) {
       LOG.error(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index d518dce..4b27e46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -29,7 +29,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.util.NetUtils;
@@ -43,7 +43,7 @@ public class TajoMasterService extends AbstractService {
   private final TajoMaster.MasterContext context;
   private final TajoConf conf;
   private final TajoMasterServiceHandler masterHandler;
-  private ProtoAsyncRpcServer server;
+  private AsyncRpcServer server;
   private InetSocketAddress bindAddress;
 
   private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
@@ -61,7 +61,7 @@ public class TajoMasterService extends AbstractService {
     String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
     try {
-      server = new ProtoAsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
+      server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
     } catch (Exception e) {
       LOG.error(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2c76ac8..0098a7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -34,8 +34,8 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 
 import java.net.InetSocketAddress;
@@ -60,7 +60,7 @@ public class QueryInProgress extends CompositeService {
 
   private final TajoMaster.MasterContext masterContext;
 
-  private ProtoAsyncRpcClient queryMasterRpc;
+  private AsyncRpcClient queryMasterRpc;
 
   private TajoWorkerProtocol.TajoWorkerProtocolService queryMasterRpcClient;
 
@@ -180,7 +180,7 @@ public class QueryInProgress extends CompositeService {
           queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
       LOG.info("Connect to QueryMaster:" + addr);
       //TODO Get Connection from pool
-      queryMasterRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+      queryMasterRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
       queryMasterRpcClient = queryMasterRpc.getStub();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 4690567..fa6790d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -34,13 +34,16 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.QueryStartEvent;
-import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.worker.TajoWorker;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -261,7 +264,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       if(queryMasterTask != null) {
         TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
-        CallFuture2 future = new CallFuture2();
+        CallFuture future = new CallFuture();
         workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, future);
         try {
           future.get(3, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index c386bf2..ae1508c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -41,13 +41,12 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.rpc.CallFuture2;
-import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.worker.AbstractResourceAllocator;
 import org.apache.tajo.worker.TajoResourceAllocator;
@@ -57,7 +56,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -161,7 +159,7 @@ public class QueryMasterTask extends CompositeService {
 
     LOG.info("Stopping QueryMasterTask:" + queryId);
 
-    CallFuture2 future = new CallFuture2();
+    CallFuture future = new CallFuture();
     queryMasterContext.getWorkerContext().getTajoMasterRpcClient()
         .stopQueryMaster(null, queryId.getProto(), future);
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index fa2ff14..385add1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -46,7 +46,7 @@ import org.apache.tajo.master.querymaster.SubQueryState;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.io.IOException;
@@ -221,8 +221,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     @Override
     public void run() {
       LOG.info("Start TajoWorkerAllocationThread");
-      CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
-          new CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+      CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
+          new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
 
       int requiredMemoryMBSlot = 512;  //TODO
       int requiredDiskSlots = 1;  //TODO

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 37b754a..99fce45 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -36,8 +36,8 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryMaster;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.CallFuture2;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.NetUtils;
@@ -73,7 +73,7 @@ public class TajoWorker extends CompositeService {
   private InetSocketAddress tajoMasterAddress;
 
   //to TajoMaster
-  private ProtoAsyncRpcClient tajoMasterRpc;
+  private AsyncRpcClient tajoMasterRpc;
 
   private TajoMasterProtocol.TajoMasterProtocolService tajoMasterRpcClient;
 
@@ -306,7 +306,7 @@ public class TajoWorker extends CompositeService {
 
     while(true) {
       try {
-        tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, this.tajoMasterAddress);
+        tajoMasterRpc = new AsyncRpcClient(TajoMasterProtocol.class, this.tajoMasterAddress);
         tajoMasterRpcClient = tajoMasterRpc.getStub();
         break;
       } catch (Exception e) {
@@ -378,8 +378,8 @@ public class TajoWorker extends CompositeService {
     }
 
     public void run() {
-      CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
-          new CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse>();
+      CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+          new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
       LOG.info("Worker Resource Heartbeat Thread start.");
       int sendDiskInfoCount = 0;
       int pullServerPort = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index dc27f1a..177e920 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -33,7 +33,7 @@ import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.master.querymaster.Query;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
 
@@ -46,7 +46,7 @@ public class TajoWorkerClientService extends AbstractService {
   private final PrimitiveProtos.BoolProto BOOL_FALSE =
           PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
 
-  private ProtoBlockingRpcServer rpcServer;
+  private BlockingRpcServer rpcServer;
   private InetSocketAddress bindAddr;
   private String addr;
   private int port;
@@ -76,7 +76,7 @@ public class TajoWorkerClientService extends AbstractService {
       }
 
       // TODO blocking/non-blocking??
-      this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
+      this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
       this.rpcServer.start();
 
       this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 76011dc..4011829 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -36,7 +36,7 @@ import org.apache.tajo.master.TaskSchedulerImpl;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.querymaster.QueryMaster;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
 
@@ -46,7 +46,7 @@ public class TajoWorkerManagerService extends CompositeService
     implements TajoWorkerProtocol.TajoWorkerProtocolService.Interface {
   private static final Log LOG = LogFactory.getLog(TajoWorkerManagerService.class.getName());
 
-  private ProtoAsyncRpcServer rpcServer;
+  private AsyncRpcServer rpcServer;
   private InetSocketAddress bindAddr;
   private String addr;
   private int port;
@@ -75,7 +75,7 @@ public class TajoWorkerManagerService extends CompositeService
         throw new IllegalArgumentException("Failed resolve of " + initIsa);
       }
 
-      this.rpcServer = new ProtoAsyncRpcServer(TajoWorkerProtocol.class, this, initIsa);
+      this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa);
       this.rpcServer.start();
 
       this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 9fa158a..b26c3f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -39,13 +39,12 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.ProtoAsyncRpcClient;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.*;
@@ -101,7 +100,7 @@ public class TaskRunner extends AbstractService {
   private String baseDir;
   private Path baseDirPath;
 
-  private ProtoAsyncRpcClient client;
+  private AsyncRpcClient client;
 
   private TaskRunnerManager taskRunnerManager;
 
@@ -137,10 +136,10 @@ public class TaskRunner extends AbstractService {
 
       // initialize MasterWorkerProtocol as an actual task owner.
       this.client =
-          taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
+          taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
             @Override
-            public ProtoAsyncRpcClient run() throws Exception {
-              return new ProtoAsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+            public AsyncRpcClient run() throws Exception {
+              return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
             }
           });
       this.master = client.getStub();
@@ -297,13 +296,13 @@ public class TaskRunner extends AbstractService {
         @Override
         public void run() {
           int receivedNum = 0;
-          CallFuture2<QueryUnitRequestProto> callFuture = null;
+          CallFuture<QueryUnitRequestProto> callFuture = null;
           QueryUnitRequestProto taskRequest = null;
 
           while(!stopped) {
             try {
               if (callFuture == null) {
-                callFuture = new CallFuture2<QueryUnitRequestProto>();
+                callFuture = new CallFuture<QueryUnitRequestProto>();
                 LOG.info("Request GetTask: " + getId());
                 GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
                     .setExecutionBlockId(executionBlockId.getProto())

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 4957bc4..2e5d2f1 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -83,8 +83,6 @@
               <arguments>
                 <argument>-Isrc/main/proto/</argument>
                 <argument>--java_out=target/generated-sources/proto</argument>
-                <argument>src/main/proto/ProtoParamRpcProtos.proto</argument>
-                <argument>src/main/proto/ProtoParamRpcProtocol.proto</argument>
                 <argument>src/main/proto/DummyProtos.proto</argument>
                 <argument>src/main/proto/RpcProtos.proto</argument>
                 <argument>src/main/proto/TestProtos.proto</argument>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
new file mode 100644
index 0000000..b062eca
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.*;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsyncRpcClient extends NettyClientBase {
+  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+
+  private final ChannelUpstreamHandler handler;
+  private final ChannelPipelineFactory pipeFactory;
+  private final ProxyRpcChannel rpcChannel;
+
+  private final AtomicInteger sequence = new AtomicInteger(0);
+  private final Map<Integer, ResponseCallback> requests =
+      new ConcurrentHashMap<Integer, ResponseCallback>();
+
+  private final Class<?> protocol;
+  private final Method stubMethod;
+
+  public AsyncRpcClient(final Class<?> protocol,
+                        final InetSocketAddress addr)
+      throws Exception {
+
+    this.protocol = protocol;
+    String serviceClassName = protocol.getName() + "$"
+        + protocol.getSimpleName() + "Service";
+    Class<?> serviceClass = Class.forName(serviceClassName);
+    stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
+
+    this.handler = new ClientChannelUpstreamHandler();
+    pipeFactory = new ProtoPipelineFactory(handler,
+        RpcResponse.getDefaultInstance());
+    super.init(addr, pipeFactory);
+    rpcChannel = new ProxyRpcChannel(getChannel());
+  }
+
+  public <T> T getStub() throws Exception {
+    return (T) stubMethod.invoke(null, rpcChannel);
+  }
+
+  public RpcChannel getRpcChannel() {
+    return this.rpcChannel;
+  }
+
+  private class ProxyRpcChannel implements RpcChannel {
+    private final Channel channel;
+    private final ClientChannelUpstreamHandler handler;
+
+    public ProxyRpcChannel(Channel channel) {
+      this.channel = channel;
+      this.handler = channel.getPipeline()
+          .get(ClientChannelUpstreamHandler.class);
+
+      if (handler == null) {
+        throw new IllegalArgumentException("Channel does not have " +
+            "proper handler");
+      }
+    }
+
+    public void callMethod(final MethodDescriptor method,
+                           final RpcController controller,
+                           final Message param,
+                           final Message responseType,
+                           RpcCallback<Message> done) {
+
+      int nextSeqId = sequence.getAndIncrement();
+
+      Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+      handler.registerCallback(nextSeqId,
+          new ResponseCallback(controller, responseType, done));
+      channel.write(rpcRequest);
+    }
+
+    private Message buildRequest(int seqId,
+                                 MethodDescriptor method,
+                                 Message param) {
+
+      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+          .setId(seqId)
+          .setMethodName(method.getName());
+
+      if (param != null) {
+          requestBuilder.setRequestMessage(param.toByteString());
+      }
+
+      return requestBuilder.build();
+    }
+  }
+
+  private class ResponseCallback implements RpcCallback<RpcResponse> {
+    private final RpcController controller;
+    private final Message responsePrototype;
+    private final RpcCallback<Message> callback;
+
+    public ResponseCallback(RpcController controller,
+                            Message responsePrototype,
+                            RpcCallback<Message> callback) {
+      this.controller = controller;
+      this.responsePrototype = responsePrototype;
+      this.callback = callback;
+    }
+
+    public void run(RpcResponse rpcResponse) {
+
+      // if hasErrorMessage is true, it means rpc-level errors.
+      // it does not call the callback function
+      if (rpcResponse.hasErrorMessage()) {
+
+        if (controller != null) {
+          this.controller.setFailed(rpcResponse.getErrorMessage());
+        }
+        callback.run(null);
+        throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
+
+      } else { // if rpc call succeed
+
+        try {
+          Message responseMessage;
+          if (!rpcResponse.hasResponseMessage()) {
+            responseMessage = null;
+          } else {
+            responseMessage = responsePrototype.newBuilderForType().mergeFrom(
+                rpcResponse.getResponseMessage()).build();
+          }
+
+          callback.run(responseMessage);
+
+        } catch (InvalidProtocolBufferException e) {
+          throw new RemoteException(getErrorMessage(""), e);
+        }
+      }
+    }
+  }
+
+  private String getErrorMessage(String message) {
+    return "Exception [" + protocol.getCanonicalName() +
+        "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+        getChannel().getRemoteAddress()) + ")]: " + message;
+  }
+
+  private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+
+    synchronized void registerCallback(int seqId, ResponseCallback callback) {
+
+      if (requests.containsKey(seqId)) {
+        throw new RemoteException(
+            getErrorMessage("Duplicate Sequence Id "+ seqId));
+      }
+
+      requests.put(seqId, callback);
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+
+      RpcResponse response = (RpcResponse) e.getMessage();
+      ResponseCallback callback = requests.remove(response.getId());
+
+      if (callback == null) {
+        LOG.warn("Dangling rpc call");
+      } else {
+        callback.run(response);
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      LOG.error(addr + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
+      e.getChannel().close();
+      throw new RemoteException(getErrorMessage(addr.toString()), e.getCause());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
new file mode 100644
index 0000000..5cf830e
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+import org.jboss.netty.channel.*;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class AsyncRpcServer extends NettyServerBase {
+  private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
+
+  private final Service service;
+  private final ChannelPipelineFactory pipeline;
+
+  public AsyncRpcServer(final Class<?> protocol,
+                        final Object instance,
+                        final InetSocketAddress bindAddress)
+      throws Exception {
+    super(protocol.getSimpleName(), bindAddress);
+
+    String serviceClassName = protocol.getName() + "$" +
+        protocol.getSimpleName() + "Service";
+    Class<?> serviceClass = Class.forName(serviceClassName);
+    Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
+    Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
+    this.service = (Service) method.invoke(null, instance);
+
+    ServerHandler handler = new ServerHandler();
+    this.pipeline = new ProtoPipelineFactory(handler,
+        RpcRequest.getDefaultInstance());
+    super.init(this.pipeline);
+  }
+
+  private class ServerHandler extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+
+      final RpcRequest request = (RpcRequest) e.getMessage();
+
+      String methodName = request.getMethodName();
+      MethodDescriptor methodDescriptor = service.getDescriptorForType().
+          findMethodByName(methodName);
+
+      if (methodDescriptor == null) {
+        throw new RemoteCallException(request.getId(),
+            new NoSuchMethodException(methodName));
+      }
+
+      Message paramProto = null;
+      if (request.hasRequestMessage()) {
+        try {
+          paramProto = service.getRequestPrototype(methodDescriptor)
+                  .newBuilderForType().mergeFrom(request.getRequestMessage()).
+                  build();
+        } catch (Throwable t) {
+          throw new RemoteCallException(request.getId(), methodDescriptor, t);
+        }
+      }
+
+      final Channel channel = e.getChannel();
+      final RpcController controller = new NettyRpcController();
+
+      RpcCallback<Message> callback =
+          !request.hasId() ? null : new RpcCallback<Message>() {
+
+        public void run(Message returnValue) {
+
+          RpcResponse.Builder builder = RpcResponse.newBuilder()
+              .setId(request.getId());
+
+          if (returnValue != null) {
+            builder.setResponseMessage(returnValue.toByteString());
+          }
+
+          if (controller.failed()) {
+            builder.setErrorMessage(controller.errorText());
+          }
+
+          channel.write(builder.build());
+        }
+      };
+
+      service.callMethod(methodDescriptor, controller, paramProto, callback);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception{
+      if (e.getCause() instanceof RemoteCallException) {
+        RemoteCallException callException = (RemoteCallException) e.getCause();
+        e.getChannel().write(callException.getResponse());
+      }
+      throw new RemoteException(e.getCause());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
new file mode 100644
index 0000000..4989c2d
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.*;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class BlockingRpcClient extends NettyClientBase {
+  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+
+  private final ChannelUpstreamHandler handler;
+  private final ChannelPipelineFactory pipeFactory;
+  private final ProxyRpcChannel rpcChannel;
+
+  private final AtomicInteger sequence = new AtomicInteger(0);
+  private final Map<Integer, ProtoCallFuture> requests =
+      new ConcurrentHashMap<Integer, ProtoCallFuture>();
+
+  private final Class<?> protocol;
+  private final Method stubMethod;
+
+  public BlockingRpcClient(final Class<?> protocol,
+                           final InetSocketAddress addr)
+      throws Exception {
+
+    this.protocol = protocol;
+    String serviceClassName = protocol.getName() + "$"
+        + protocol.getSimpleName() + "Service";
+    Class<?> serviceClass = Class.forName(serviceClassName);
+    stubMethod = serviceClass.getMethod("newBlockingStub",
+        BlockingRpcChannel.class);
+
+    this.handler = new ClientChannelUpstreamHandler();
+    pipeFactory = new ProtoPipelineFactory(handler,
+        RpcResponse.getDefaultInstance());
+    super.init(addr, pipeFactory);
+    rpcChannel = new ProxyRpcChannel(getChannel());
+  }
+
+  public <T> T getStub() throws Exception {
+    return (T) stubMethod.invoke(null, rpcChannel);
+  }
+
+  public BlockingRpcChannel getBlockingRpcChannel() {
+    return this.rpcChannel;
+  }
+
+  private class ProxyRpcChannel implements BlockingRpcChannel {
+    private final Channel channel;
+    private final ClientChannelUpstreamHandler handler;
+
+    public ProxyRpcChannel(Channel channel) {
+      this.channel = channel;
+      this.handler = channel.getPipeline().
+          get(ClientChannelUpstreamHandler.class);
+
+      if (handler == null) {
+        throw new IllegalArgumentException("Channel does not have " +
+            "proper handler");
+      }
+    }
+
+    public Message callBlockingMethod(final MethodDescriptor method,
+                                      final RpcController controller,
+                                      final Message param,
+                                      final Message responsePrototype)
+        throws ServiceException {
+
+      int nextSeqId = sequence.getAndIncrement();
+
+      Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+      ProtoCallFuture callFuture =
+          new ProtoCallFuture(controller, responsePrototype);
+      requests.put(nextSeqId, callFuture);
+      channel.write(rpcRequest);
+
+      try {
+        return callFuture.get();
+      } catch (Throwable t) {
+        throw new RemoteException(t);
+      }
+    }
+
+    private Message buildRequest(int seqId,
+                                 MethodDescriptor method,
+                                 Message param) {
+      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+          .setId(seqId)
+          .setMethodName(method.getName());
+
+      if (param != null) {
+        requestBuilder.setRequestMessage(param.toByteString());
+      }
+
+      return requestBuilder.build();
+    }
+  }
+
+  private String getErrorMessage(String message) {
+    if(protocol != null && getChannel() != null) {
+      return "Exception [" + protocol.getCanonicalName() +
+          "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
+          getChannel().getRemoteAddress()) + ")]: " + message;
+    } else {
+      return "Exception " + message;
+    }
+  }
+
+  private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+
+      RpcResponse rpcResponse = (RpcResponse) e.getMessage();
+      ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+
+      if (callback == null) {
+        LOG.warn("Dangling rpc call");
+      } else {
+        if (rpcResponse.hasErrorMessage()) {
+          if (callback.controller != null) {
+            callback.setFailed(rpcResponse.getErrorMessage());
+          }
+          throw new RemoteException(
+              getErrorMessage(rpcResponse.getErrorMessage()));
+        } else {
+          Message responseMessage;
+
+          if (!rpcResponse.hasResponseMessage()) {
+            responseMessage = null;
+          } else {
+            responseMessage =
+                callback.returnType.newBuilderForType().
+                    mergeFrom(rpcResponse.getResponseMessage()).build();
+          }
+
+          callback.setResponse(responseMessage);
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      e.getChannel().close();
+      throw new RemoteException(getErrorMessage(""), e.getCause());
+    }
+  }
+
+  class ProtoCallFuture implements Future<Message> {
+    private Semaphore sem = new Semaphore(0);
+    private Message response = null;
+    private Message returnType;
+
+    private RpcController controller;
+
+    public ProtoCallFuture(RpcController controller, Message message) {
+      this.controller = controller;
+      this.returnType = message;
+    }
+
+    @Override
+    public boolean cancel(boolean arg0) {
+      return false;
+    }
+
+    @Override
+    public Message get() throws InterruptedException, ExecutionException {
+      sem.acquire();
+      return response;
+    }
+
+    @Override
+    public Message get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      if(sem.tryAcquire(timeout, unit)) {
+        return response;
+      } else {
+        throw new TimeoutException();
+      }
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return sem.availablePermits() > 0;
+    }
+
+    public void setResponse(Message response) {
+      this.response = response;
+      sem.release();
+    }
+
+    public void setFailed(String errorText) {
+      this.controller.setFailed(errorText);
+      sem.release();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
new file mode 100644
index 0000000..3649c5e
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.*;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class BlockingRpcServer extends NettyServerBase {
+  private static Log LOG = LogFactory.getLog(BlockingRpcServer.class);
+  private final BlockingService service;
+  private final ChannelPipelineFactory pipeline;
+
+  public BlockingRpcServer(final Class<?> protocol,
+                           final Object instance,
+                           final InetSocketAddress bindAddress)
+      throws Exception {
+
+    super(protocol.getSimpleName(), bindAddress);
+
+    String serviceClassName = protocol.getName() + "$" +
+        protocol.getSimpleName() + "Service";
+    Class<?> serviceClass = Class.forName(serviceClassName);
+    Class<?> interfaceClass = Class.forName(serviceClassName +
+        "$BlockingInterface");
+    Method method = serviceClass.getMethod(
+        "newReflectiveBlockingService", interfaceClass);
+
+    this.service = (BlockingService) method.invoke(null, instance);
+    this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
+        RpcRequest.getDefaultInstance());
+
+    super.init(this.pipeline);
+  }
+
+  private class ServerHandler extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      final RpcRequest request = (RpcRequest) e.getMessage();
+
+      String methodName = request.getMethodName();
+      MethodDescriptor methodDescriptor =
+          service.getDescriptorForType().findMethodByName(methodName);
+
+      if (methodDescriptor == null) {
+        throw new RemoteCallException(request.getId(),
+            new NoSuchMethodException(methodName));
+      }
+      Message paramProto = null;
+      if (request.hasRequestMessage()) {
+        try {
+          paramProto = service.getRequestPrototype(methodDescriptor)
+              .newBuilderForType().mergeFrom(request.getRequestMessage()).
+                  build();
+
+        } catch (Throwable t) {
+          throw new RemoteCallException(request.getId(), methodDescriptor, t);
+        }
+      }
+      Message returnValue;
+      RpcController controller = new NettyRpcController();
+
+      try {
+        returnValue = service.callBlockingMethod(methodDescriptor,
+            controller, paramProto);
+      } catch (Throwable t) {
+        throw new RemoteCallException(request.getId(), methodDescriptor, t);
+      }
+
+      RpcResponse.Builder builder =
+          RpcResponse.newBuilder().setId(request.getId());
+
+      if (returnValue != null) {
+        builder.setResponseMessage(returnValue.toByteString());
+      }
+
+      if (controller.failed()) {
+        builder.setErrorMessage(controller.errorText());
+      }
+      e.getChannel().write(builder.build());
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+      if (e.getCause() instanceof RemoteCallException) {
+        RemoteCallException callException = (RemoteCallException) e.getCause();
+        e.getChannel().write(callException.getResponse());
+      }
+
+      throw new RemoteException(e.getCause());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
index 4a000ad..0a9adab 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
@@ -18,58 +18,57 @@
 
 package org.apache.tajo.rpc;
 
-import java.util.concurrent.*;
+import com.google.protobuf.RpcCallback;
 
-@Deprecated
-class CallFuture implements Future<Object> {
-  private Semaphore sem = new Semaphore(0);
-  private Object response = null;
-  @SuppressWarnings("rawtypes")
-  private Class returnType;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-  @SuppressWarnings("rawtypes")
-  public CallFuture(Class returnType) {
-    this.returnType = returnType;
+public class CallFuture<T> implements RpcCallback<T>, Future<T> {
+
+  private final Semaphore sem = new Semaphore(0);
+  private boolean done = false;
+  private T response;
+
+  @Override
+  public void run(T t) {
+    this.response = t;
+    done = true;
+    sem.release();
   }
 
-  @SuppressWarnings("rawtypes")
-  public Class getReturnType() {
-    return this.returnType;
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    // TODO - to be implemented
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public boolean cancel(boolean arg0) {
-    return false;
+  public boolean isCancelled() {
+    // TODO - to be implemented
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public Object get() throws InterruptedException, ExecutionException {
+  public boolean isDone() {
+    return done;
+  }
+
+  @Override
+  public T get() throws InterruptedException {
     sem.acquire();
+
     return response;
   }
 
   @Override
-  public Object get(long timeout, TimeUnit unit) throws InterruptedException,
-      ExecutionException, TimeoutException {
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, TimeoutException {
     if (sem.tryAcquire(timeout, unit)) {
       return response;
     } else {
       throw new TimeoutException();
     }
   }
-
-  @Override
-  public boolean isCancelled() {
-    return false;
-  }
-
-  @Override
-  public boolean isDone() {
-    return sem.availablePermits() > 0;
-  }
-
-  public void setResponse(Object response) {
-    this.response = response;
-    sem.release();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java
deleted file mode 100644
index ddff447..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture2.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public class CallFuture2<T> implements RpcCallback<T>, Future<T> {
-
-  private final Semaphore sem = new Semaphore(0);
-  private boolean done = false;
-  private T response;
-
-  @Override
-  public void run(T t) {
-    this.response = t;
-    done = true;
-    sem.release();
-  }
-
-  @Override
-  public boolean cancel(boolean mayInterruptIfRunning) {
-    // TODO - to be implemented
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    // TODO - to be implemented
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean isDone() {
-    return done;
-  }
-
-  @Override
-  public T get() throws InterruptedException {
-    sem.acquire();
-
-    return response;
-  }
-
-  @Override
-  public T get(long timeout, TimeUnit unit)
-      throws InterruptedException, TimeoutException {
-    if (sem.tryAcquire(timeout, unit)) {
-      return response;
-    } else {
-      throw new TimeoutException();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java
deleted file mode 100644
index 1a5c54d..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/Callback.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.util.concurrent.*;
-
-@Deprecated
-public class Callback<T> implements Future<T> {
-  public enum Status {
-    READY, SUCCESS, FAILURE
-  }
-
-  private Status status;
-  private Semaphore sem = new Semaphore(0);
-  private T result = null;
-  private RemoteException err;
-
-  public Callback() {
-    status = Status.READY;
-  }
-
-  public void onComplete(T response) {
-    status = Status.SUCCESS;
-    result = response;
-    sem.release();
-  }
-
-  public void onFailure(RemoteException error) {
-    status = Status.FAILURE;
-    result = null;
-    err = error;
-    sem.release();
-  }
-
-  @Override
-  public T get() throws InterruptedException, ExecutionException {
-    if (!didGetResponse()) {
-      sem.acquire();
-    }
-
-    if (isFailure()) {
-      throw err;
-    }
-    return result;
-  }
-
-  @Override
-  public T get(long timeout, TimeUnit unit) throws InterruptedException,
-      ExecutionException, TimeoutException {
-    if (!didGetResponse()) {
-      if (sem.tryAcquire(timeout, unit)) {
-        if (isFailure()) {
-          throw err;
-        }
-        return result;
-      } else {
-        throw new TimeoutException();
-      }
-    }
-    return result;
-  }
-
-  public boolean didGetResponse() {
-    return (status != Status.READY);
-  }
-
-  public boolean isSuccess() {
-    return (status == Status.SUCCESS);
-  }
-
-  public boolean isFailure() {
-    return (status == Status.FAILURE);
-  }
-
-  public String getErrorMessage() {
-    if (status == Status.SUCCESS) {
-      return "";
-    }
-    return err.getMessage();
-  }
-
-  @Override
-  public boolean cancel(boolean arg0) {
-    return false;
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return false;
-  }
-
-  @Override
-  public boolean isDone() {
-    return sem.availablePermits() > 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java
deleted file mode 100644
index c8ea055..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyAsyncRpcProxy.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Invocation;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Response;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@Deprecated
-public class NettyAsyncRpcProxy extends NettyClientBase {
-  private static Log LOG = LogFactory.getLog(NettyAsyncRpcProxy.class);
-
-  private final Class<?> protocol;
-  private final ClientHandler handler;
-  InetSocketAddress addr;
-
-  private final AtomicInteger sequence = new AtomicInteger(0);
-  private final Map<Integer, ResponseRpcCallback> requests =
-      new ConcurrentHashMap<Integer, ResponseRpcCallback>();
-
-  @SuppressWarnings("rawtypes")
-  private final Map<String, Class> returnTypeMap = new HashMap<String, Class>();
-
-  public NettyAsyncRpcProxy(Class<?> server, Class<?> client,
-                            InetSocketAddress addr) {
-    this.protocol = client;
-
-    this.handler = new ClientHandler();
-    ChannelPipelineFactory pipeFactory =
-        new ProtoPipelineFactory(handler, Response.getDefaultInstance());
-
-    for (Method method : server.getMethods()) {
-      returnTypeMap.put(method.getName(), method.getReturnType());
-    }
-
-    super.init(addr, pipeFactory);
-  }
-
-  public Object getProxy() {
-    return Proxy.newProxyInstance(protocol.getClassLoader(),
-        new Class[] { protocol }, new Invoker(getChannel()));
-  }
-
-  public class Invoker implements InvocationHandler {
-    private final Channel channel;
-
-    public Invoker(Channel channel) {
-      this.channel = channel;
-    }
-
-    @SuppressWarnings("rawtypes")
-    public Object invoke(Object proxy, Method method, Object[] args)
-        throws Throwable {
-
-      int seqId = sequence.incrementAndGet();
-      Invocation.Builder builder = Invocation.newBuilder();
-
-      if (args != null) {
-        for (int i = 1; i < args.length; i++) {
-          ByteString str = ((Message) args[i]).toByteString();
-          builder.addParam(str);
-        }
-      }
-      Callback userCallBack = (Callback) args[0];
-      ResponseRpcCallback rpcCallback =
-          new ResponseRpcCallback(returnTypeMap.get(method.getName()),
-              userCallBack);
-
-      Invocation request =
-          builder.setId(seqId).setMethodName(method.getName()).build();
-      requests.put(seqId, rpcCallback);
-      this.channel.write(request);
-      return null;
-    }
-
-    public void shutdown() {
-      LOG.info("[RPC] Client terminates connection "
-          + channel.getRemoteAddress());
-      this.channel.close().awaitUninterruptibly();
-      bootstrap.releaseExternalResources();
-    }
-  }
-
-  private class ResponseRpcCallback implements RpcCallback<Response> {
-    @SuppressWarnings("rawtypes")
-    private final Callback callback;
-    private Response response;
-    private Class<?> retType;
-
-    @SuppressWarnings("rawtypes")
-    public ResponseRpcCallback(Class clazz, Callback callback) {
-      this.callback = callback;
-      this.retType = clazz;
-    }
-
-    @SuppressWarnings("unchecked")
-    public void run(Response message) {
-      response = message;
-
-      Object retObj = null;
-      if (response != null) {
-        if (!response.getHasReturn()) {
-          if (response.hasExceptionMessage()) {
-            callback.onFailure(new RemoteException(response
-                .getExceptionMessage()));
-            return;
-          }
-          retObj = null;
-        } else {
-          try {
-            Method mtd =
-                retType
-                    .getMethod("parseFrom", new Class[] { ByteString.class });
-            retObj = mtd.invoke(null, response.getReturnValue());
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-      }
-
-      callback.onComplete(retObj);
-    }
-  }
-
-  private class ClientHandler extends SimpleChannelUpstreamHandler {
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-      Response response = (Response) e.getMessage();
-      ResponseRpcCallback callback = requests.remove(response.getId());
-
-      if (callback == null) {
-        LOG.debug("dangling rpc call");
-      } else {
-        callback.run(response);
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      LOG.error("[RPC] ERROR " + e.getChannel().getRemoteAddress() + " "
-          + e.getCause());
-      e.getChannel().close();
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java
deleted file mode 100644
index 8954048..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyBlockingRpcProxy.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Invocation;
-import org.apache.tajo.rpc.ProtoParamRpcProtos.Response;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@Deprecated
-public class NettyBlockingRpcProxy extends NettyClientBase {
-
-  private static Log LOG = LogFactory.getLog(NettyBlockingRpcProxy.class);
-
-  private final Class<?> protocol;
-  private final ChannelPipelineFactory pipeFactory;
-  private final ClientHandler handler;
-  private final AtomicInteger sequence = new AtomicInteger(0);
-  private Map<Integer, CallFuture> requests =
-      new ConcurrentHashMap<Integer, CallFuture>();
-
-  public NettyBlockingRpcProxy(Class<?> protocol, InetSocketAddress addr) {
-    this.protocol = protocol;
-    this.handler = new ClientHandler();
-    this.pipeFactory =
-        new ProtoPipelineFactory(handler, Response.getDefaultInstance());
-    super.init(addr, pipeFactory);
-  }
-
-  public Object getProxy() {
-    return Proxy.newProxyInstance(protocol.getClassLoader(),
-        new Class[] { protocol }, new Invoker(getChannel()));
-  }
-
-  public String getExceptionMessage() {
-    return handler.getExceptionMessage();
-  }
-
-  public class Invoker implements InvocationHandler {
-    private final Channel channel;
-
-    public Invoker(Channel channel) {
-      this.channel = channel;
-    }
-
-    public Object invoke(Object proxy, Method method, Object[] args)
-        throws Throwable {
-
-      int nextSeqId = sequence.incrementAndGet();
-
-      Invocation.Builder builder = Invocation.newBuilder();
-
-      if (args != null) {
-        for (int i = 0; i < args.length; i++) {
-          ByteString str = ((Message) args[i]).toByteString();
-          builder.addParam(str);
-        }
-      }
-
-      Invocation request =
-          builder.setId(nextSeqId).setMethodName(method.getName()).build();
-
-      CallFuture callFuture = new CallFuture(method.getReturnType());
-      requests.put(nextSeqId, callFuture);
-      this.channel.write(request);
-      Object retObj = callFuture.get();
-      String exceptionMessage = handler.getExceptionMessage();
-
-      if (exceptionMessage == "") {
-        return retObj;
-      } else {
-        throw new RemoteException(exceptionMessage);
-      }
-    }
-
-    public void shutdown() {
-      LOG.info("[RPC] Client terminates connection "
-          + channel.getRemoteAddress());
-      this.channel.close().awaitUninterruptibly();
-      bootstrap.releaseExternalResources();
-    }
-  }
-
-  private class ClientHandler extends SimpleChannelUpstreamHandler {
-    private String exceptionMessage = "";
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-      Response response = (Response) e.getMessage();
-      CallFuture callFuture = requests.get(response.getId());
-
-      Object r = null;
-      if (response != null) {
-        if (!response.getHasReturn()) {
-          if (response.hasExceptionMessage()) {
-            this.exceptionMessage = response.getExceptionMessage();
-          }
-          response = null;
-        } else {
-          @SuppressWarnings("unchecked")
-          Method mtd =
-              callFuture.getReturnType().getMethod("parseFrom",
-                  new Class[] { ByteString.class });
-          r = mtd.invoke(null, response.getReturnValue());
-
-        }
-      }
-
-      if (callFuture == null) {
-        LOG.debug("dangling rpc call");
-      } else {
-        callFuture.setResponse(r);
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-        throws Exception {
-      LOG.error("[RPC] ERROR " + e.getChannel().getRemoteAddress() + " "
-          + e.getCause());
-      e.getChannel().close();
-    }
-
-    public String getExceptionMessage() {
-      return this.exceptionMessage;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fd60ec39/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java
deleted file mode 100644
index a0fd9a2..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpc.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.net.InetSocketAddress;
-
-public class NettyRpc {
-  @Deprecated
-  public static NettyRpcServer getProtoParamRpcServer(Object instance,
-      Class<?> interfaceClass, InetSocketAddress addr) {
-    return new NettyRpcServer(instance, interfaceClass, addr);
-  }
-
-  @Deprecated
-  public static Object getProtoParamAsyncRpcProxy(Class<?> serverClass,
-      Class<?> clientClass, InetSocketAddress addr) {
-    return new NettyAsyncRpcProxy(serverClass, clientClass, addr)
-        .getProxy();
-  }
-
-  @Deprecated
-  public static Object getProtoParamBlockingRpcProxy(Class<?> protocol,
-      InetSocketAddress addr) {
-    return new NettyBlockingRpcProxy(protocol, addr).getProxy();
-  }
-}