You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/18 18:25:47 UTC

[08/13] tajo git commit: TAJO-1337: Implements common modules to handle RESTful API

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
new file mode 100644
index 0000000..fb1cec2
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.protobuf.ServiceException;
+
+public abstract class ServerCallable<T> {
+  protected InetSocketAddress addr;
+  protected long startTime;
+  protected long endTime;
+  protected Class<?> protocol;
+  protected boolean asyncMode;
+  protected boolean closeConn;
+  protected RpcConnectionPool connPool;
+
+  public abstract T call(NettyClientBase client) throws Exception;
+
+  public ServerCallable(RpcConnectionPool connPool,  InetSocketAddress addr, Class<?> protocol, boolean asyncMode) {
+    this(connPool, addr, protocol, asyncMode, false);
+  }
+
+  public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol,
+                        boolean asyncMode, boolean closeConn) {
+    this.connPool = connPool;
+    this.addr = addr;
+    this.protocol = protocol;
+    this.asyncMode = asyncMode;
+    this.closeConn = closeConn;
+  }
+
+  public void beforeCall() {
+    this.startTime = System.currentTimeMillis();
+  }
+
+  public long getStartTime(){
+    return startTime;
+  }
+
+  public void afterCall() {
+    this.endTime = System.currentTimeMillis();
+  }
+
+  public long getEndTime(){
+    return endTime;
+  }
+
+  boolean abort = false;
+  public void abort() {
+    abort = true;
+  }
+  /**
+   * Run this instance with retries, timed waits,
+   * and refinds of missing regions.
+   *
+   * @param <T> the type of the return value
+   * @return an object of type T
+   * @throws com.google.protobuf.ServiceException if a remote or network exception occurs
+   */
+  public T withRetries() throws ServiceException {
+    //TODO configurable
+    final long pause = 500; //ms
+    final int numRetries = 3;
+    List<Throwable> exceptions = new ArrayList<Throwable>();
+
+    for (int tries = 0; tries < numRetries; tries++) {
+      NettyClientBase client = null;
+      try {
+        beforeCall();
+        if(addr != null) {
+          client = connPool.getConnection(addr, protocol, asyncMode);
+        }
+        return call(client);
+      } catch (IOException ioe) {
+        exceptions.add(ioe);
+        if(abort) {
+          throw new ServiceException(ioe.getMessage(), ioe);
+        }
+        if (tries == numRetries - 1) {
+          throw new ServiceException("Giving up after tries=" + tries, ioe);
+        }
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      } finally {
+        afterCall();
+        if(closeConn) {
+          connPool.closeConnection(client);
+        } else {
+          connPool.releaseConnection(client);
+        }
+      }
+      try {
+        Thread.sleep(pause * (tries + 1));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ServiceException("Giving up after tries=" + tries, e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Run this instance against the server once.
+   * @param <T> the type of the return value
+   * @return an object of type T
+   * @throws java.io.IOException if a remote or network exception occurs
+   * @throws RuntimeException other unspecified error
+   */
+  public T withoutRetries() throws IOException, RuntimeException {
+    NettyClientBase client = null;
+    try {
+      beforeCall();
+      client = connPool.getConnection(addr, protocol, asyncMode);
+      return call(client);
+    } catch (Throwable t) {
+      Throwable t2 = translateException(t);
+      if (t2 instanceof IOException) {
+        throw (IOException)t2;
+      } else {
+        throw new RuntimeException(t2);
+      }
+    } finally {
+      afterCall();
+      if(closeConn) {
+        connPool.closeConnection(client);
+      } else {
+        connPool.releaseConnection(client);
+      }
+    }
+  }
+
+  private static Throwable translateException(Throwable t) throws IOException {
+    if (t instanceof UndeclaredThrowableException) {
+      t = t.getCause();
+    }
+    if (t instanceof RemoteException && t.getCause() != null) {
+      t = t.getCause();
+    }
+    return t;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
new file mode 100644
index 0000000..113d181
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+public class TajoServiceException extends ServiceException {
+  private String traceMessage;
+  private String protocol;
+  private String remoteAddress;
+
+  public TajoServiceException(String message) {
+    super(message);
+  }
+  public TajoServiceException(String message, String traceMessage) {
+    super(message);
+    this.traceMessage = traceMessage;
+  }
+
+  public TajoServiceException(String message, Throwable cause, String protocol, String remoteAddress) {
+    super(message, cause);
+
+    this.protocol = protocol;
+    this.remoteAddress = remoteAddress;
+  }
+
+  public String getTraceMessage() {
+    if(traceMessage == null && getCause() != null){
+      this.traceMessage = ExceptionUtils.getStackTrace(getCause());
+    }
+    return traceMessage;
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public String getRemoteAddress() {
+    return remoteAddress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto
new file mode 100644
index 0000000..f53f0d6
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2012 Database Lab., Korea Univ.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.rpc.test";
+option java_outer_classname = "DummyProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message MulRequest1 {
+	required int32 x1 = 1;
+	required int32 x2 = 2;
+}
+
+message MulRequest2 {
+	required int32 x1 = 1;
+	required int32 x2 = 2;
+}
+
+message MulResponse {
+	required int32 result1 = 1;
+	required int32 result2 = 2;
+}
+
+message InnerNode {
+	required string instr = 1;
+}
+
+message InnerRequest {
+	repeated InnerNode nodes = 1;
+}
+
+message InnerResponse {
+	repeated InnerNode nodes = 1;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto
new file mode 100644
index 0000000..69f43ed
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2012 Database Lab., Korea Univ.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.rpc";
+option java_outer_classname = "RpcProtos";
+
+message RpcRequest {
+  required int32 id = 1;
+  required string method_name = 2;
+  optional bytes request_message = 3;
+}
+
+message RpcResponse {
+  required int32 id = 1;
+  optional bytes response_message = 2;
+  optional string error_class = 3;
+  optional string error_message = 4;
+  optional string error_trace = 5;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto
new file mode 100644
index 0000000..58640ea
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2012 Database Lab., Korea Univ.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.rpc.test";
+option java_outer_classname = "DummyProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "TestProtos.proto";
+
+service DummyProtocolService {
+  rpc sum (SumRequest) returns (SumResponse);
+  rpc echo (EchoMessage) returns (EchoMessage);
+  rpc getError (EchoMessage) returns (EchoMessage);
+  rpc getNull (EchoMessage) returns (EchoMessage);
+  rpc deley (EchoMessage) returns (EchoMessage);
+  rpc throwException (EchoMessage) returns (EchoMessage);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto
new file mode 100644
index 0000000..5001c0e
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2012 Database Lab., Korea Univ.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.rpc.test";
+option java_outer_classname = "TestProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message EchoMessage {
+  required string message = 1;
+}
+
+message SumRequest {
+  required int32 x1 = 1;
+  required int64 x2 = 2;
+  required double x3 = 3;
+  required float x4 = 4;
+}
+
+message SumResponse {
+  required double result = 1;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties b/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties
new file mode 100644
index 0000000..2c4d991
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties
@@ -0,0 +1,25 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p: %c (%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
new file mode 100644
index 0000000..a974a65
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+public class TestAsyncRpc {
+  private static Log LOG = LogFactory.getLog(TestAsyncRpc.class);
+  private static String MESSAGE = "TestAsyncRpc";
+
+  double sum;
+  String echo;
+
+  AsyncRpcServer server;
+  AsyncRpcClient client;
+  Interface stub;
+  DummyProtocolAsyncImpl service;
+  int retries;
+  
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @interface SetupRpcConnection {
+    boolean setupRpcServer() default true;
+    boolean setupRpcClient() default true;
+  }
+  
+  @Rule
+  public ExternalResource resource = new ExternalResource() {
+    
+    private Description description;
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+      this.description = description;
+      return super.apply(base, description);
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        setUpRpcServer();
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        setUpRpcClient();
+      }
+    }
+
+    @Override
+    protected void after() {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        try {
+          tearDownRpcClient();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        try {
+          tearDownRpcServer();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+    }
+    
+  };
+  
+  public void setUpRpcServer() throws Exception {
+    service = new DummyProtocolAsyncImpl();
+    server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), 2);
+    server.start();
+  }
+  
+  public void setUpRpcClient() throws Exception {
+    retries = 1;
+
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.getConnectAddress(server.getListenAddress()),
+              DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT);
+    stub = client.getStub();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    RpcChannelFactory.shutdownGracefully();
+  }
+  
+  public void tearDownRpcServer() throws Exception {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+  }
+  
+  public void tearDownRpcClient() throws Exception {
+    if(client != null) {
+      client.close();
+      client = null;
+    }
+  }
+
+  boolean calledMarker = false;
+
+  @Test
+  public void testRpc() throws Exception {
+
+    SumRequest sumRequest = SumRequest.newBuilder()
+        .setX1(1)
+        .setX2(2)
+        .setX3(3.15d)
+        .setX4(2.0f).build();
+
+    stub.sum(null, sumRequest, new RpcCallback<SumResponse>() {
+      @Override
+      public void run(SumResponse parameter) {
+        sum = parameter.getResult();
+        assertTrue(8.15d == sum);
+      }
+    });
+
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
+      @Override
+      public void run(EchoMessage parameter) {
+        echo = parameter.getMessage();
+        assertEquals(MESSAGE, echo);
+        calledMarker = true;
+      }
+    };
+    stub.echo(null, echoMessage, callback);
+    Thread.sleep(1000);
+    assertTrue(calledMarker);
+  }
+
+  private CountDownLatch testNullLatch;
+
+  @Test
+  public void testGetNull() throws Exception {
+    testNullLatch = new CountDownLatch(1);
+    stub.getNull(null, null, new RpcCallback<EchoMessage>() {
+      @Override
+      public void run(EchoMessage parameter) {
+        assertNull(parameter);
+        LOG.info("testGetNull retrieved");
+        testNullLatch.countDown();
+      }
+    });
+    assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS));
+    assertTrue(service.getNullCalled);
+  }
+
+  @Test
+  public void testCallFuture() throws Exception {
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.deley(null, echoMessage, future);
+
+    assertFalse(future.isDone());
+    assertEquals(future.get(), echoMessage);
+    assertTrue(future.isDone());
+  }
+
+  @Test
+  public void testCallFutureTimeout() throws Exception {
+    boolean timeout = false;
+    try {
+      EchoMessage echoMessage = EchoMessage.newBuilder()
+          .setMessage(MESSAGE).build();
+      CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+      stub.deley(null, echoMessage, future);
+
+      assertFalse(future.isDone());
+      future.get(1, TimeUnit.SECONDS);
+    } catch (TimeoutException te) {
+      timeout = true;
+    }
+    assertTrue(timeout);
+  }
+
+  @Test
+  public void testCallFutureDisconnected() throws Exception {
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+    tearDownRpcServer();
+
+    stub.echo(future.getController(), echoMessage, future);
+    EchoMessage response = future.get();
+
+    assertNull(response);
+    assertTrue(future.getController().failed());
+    assertTrue(future.getController().errorText() != null);
+  }
+
+  @Test
+  public void testStubDisconnected() throws Exception {
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+    if (server != null) {
+      server.shutdown(true);
+      server = null;
+    }
+
+    stub = client.getStub();
+    stub.echo(future.getController(), echoMessage, future);
+    EchoMessage response = future.get();
+
+    assertNull(response);
+    assertTrue(future.getController().failed());
+    assertTrue(future.getController().errorText() != null);
+  }
+
+  @Test
+  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
+  public void testConnectionRetry() throws Exception {
+    retries = 10;
+    ServerSocket serverSocket = new ServerSocket(0);
+    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+    serverSocket.close();
+    service = new DummyProtocolAsyncImpl();
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+    //lazy startup
+    Thread serverThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(1000);
+          server = new AsyncRpcServer(DummyProtocol.class,
+              service, address, 2);
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+        server.start();
+      }
+    });
+    serverThread.start();
+
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    stub = client.getStub();
+    stub.echo(future.getController(), echoMessage, future);
+
+    assertFalse(future.isDone());
+    assertEquals(echoMessage, future.get());
+    assertTrue(future.isDone());
+  }
+
+  @Test
+  public void testConnectionFailure() throws Exception {
+    InetSocketAddress address = new InetSocketAddress("test", 0);
+    try {
+      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
+      NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
+      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    } catch (Throwable throwable) {
+      fail();
+    }
+  }
+
+  @Test
+  @SetupRpcConnection(setupRpcClient=false)
+  public void testUnresolvedAddress() throws Exception {
+    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    Interface stub = client.getStub();
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.deley(null, echoMessage, future);
+
+    assertFalse(future.isDone());
+    assertEquals(future.get(), echoMessage);
+    assertTrue(future.isDone());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
new file mode 100644
index 0000000..10dd766
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class TestBlockingRpc {
+  public static final String MESSAGE = "TestBlockingRpc";
+
+  private BlockingRpcServer server;
+  private BlockingRpcClient client;
+  private BlockingInterface stub;
+  private DummyProtocolBlockingImpl service;
+  private int retries;
+  
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @interface SetupRpcConnection {
+    boolean setupRpcServer() default true;
+    boolean setupRpcClient() default true;
+  }
+  
+  @Rule
+  public ExternalResource resource = new ExternalResource() {
+    
+    private Description description;
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+      this.description = description;
+      return super.apply(base, description);
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        setUpRpcServer();
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        setUpRpcClient();
+      }
+    }
+
+    @Override
+    protected void after() {
+      SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+        try {
+          tearDownRpcClient();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+      
+      if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+        try {
+          tearDownRpcServer();
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+      }
+    }
+    
+  };
+  
+  public void setUpRpcServer() throws Exception {
+    service = new DummyProtocolBlockingImpl();
+    server = new BlockingRpcServer(DummyProtocol.class, service,
+        new InetSocketAddress("127.0.0.1", 0), 2);
+    server.start();
+  }
+  
+  public void setUpRpcClient() throws Exception {
+    retries = 1;
+
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.getConnectAddress(server.getListenAddress()),
+              DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    stub = client.getStub();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    RpcChannelFactory.shutdownGracefully();
+  }
+  
+  public void tearDownRpcServer() throws Exception {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+  }
+  
+  public void tearDownRpcClient() throws Exception {
+    if(client != null) {
+      client.close();
+      client = null;
+    }
+  }
+
+  @Test
+  public void testRpc() throws Exception {
+    SumRequest request = SumRequest.newBuilder()
+        .setX1(1)
+        .setX2(2)
+        .setX3(3.15d)
+        .setX4(2.0f).build();
+    SumResponse response1 = stub.sum(null, request);
+    assertEquals(8.15d, response1.getResult(), 1e-15);
+
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    EchoMessage response2 = stub.echo(null, message);
+    assertEquals(MESSAGE, response2.getMessage());
+  }
+
+  @Test
+  @SetupRpcConnection(setupRpcClient=false)
+  public void testRpcWithServiceCallable() throws Exception {
+    RpcConnectionPool pool = RpcConnectionPool.getPool();
+    final SumRequest request = SumRequest.newBuilder()
+        .setX1(1)
+        .setX2(2)
+        .setX3(3.15d)
+        .setX4(2.0f).build();
+
+    SumResponse response =
+    new ServerCallable<SumResponse>(pool,
+        server.getListenAddress(), DummyProtocol.class, false) {
+      @Override
+      public SumResponse call(NettyClientBase client) throws Exception {
+        BlockingInterface stub2 = client.getStub();
+        SumResponse response1 = stub2.sum(null, request);
+        return response1;
+      }
+    }.withRetries();
+
+    assertEquals(8.15d, response.getResult(), 1e-15);
+
+    response =
+        new ServerCallable<SumResponse>(pool,
+            server.getListenAddress(), DummyProtocol.class, false) {
+          @Override
+          public SumResponse call(NettyClientBase client) throws Exception {
+            BlockingInterface stub2 = client.getStub();
+            SumResponse response1 = stub2.sum(null, request);
+            return response1;
+          }
+        }.withoutRetries();
+
+    assertTrue(8.15d == response.getResult());
+    pool.close();
+  }
+
+  @Test
+  public void testThrowException() throws Exception {
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    try {
+      stub.throwException(null, message);
+      fail("RpcCall should throw exception");
+    } catch (Throwable t) {
+      assertTrue(t instanceof TajoServiceException);
+      assertEquals("Exception Test", t.getMessage());
+      TajoServiceException te = (TajoServiceException)t;
+      assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol());
+      assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(),
+          te.getRemoteAddress());
+    }
+  }
+
+  @Test
+  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
+  public void testConnectionRetry() throws Exception {
+    retries = 10;
+    ServerSocket serverSocket = new ServerSocket(0);
+    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+    serverSocket.close();
+
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    //lazy startup
+    Thread serverThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(1000);
+          server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+        server.start();
+      }
+    });
+    serverThread.start();
+
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    stub = client.getStub();
+
+    EchoMessage response = stub.echo(null, message);
+    assertEquals(MESSAGE, response.getMessage());
+  }
+
+  @Test
+  public void testConnectionFailed() throws Exception {
+    NettyClientBase client = null;
+    
+    try {
+      int port = server.getListenAddress().getPort() + 1;
+      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)),
+              DummyProtocol.class, false);
+      client = new BlockingRpcClient(rpcConnectionKey, retries);
+      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+      client.close();
+    } catch (Throwable ce){
+      if (client != null) {
+        client.close();
+      }
+      fail();
+    }
+  }
+
+  @Test
+  public void testGetNull() throws Exception {
+    assertNull(stub.getNull(null, null));
+    assertTrue(service.getNullCalled);
+  }
+
+  @Test
+  public void testShutdown() throws Exception {
+    final StringBuilder error = new StringBuilder();
+    Thread callThread = new Thread() {
+      public void run() {
+        try {
+          EchoMessage message = EchoMessage.newBuilder()
+              .setMessage(MESSAGE)
+              .build();
+          stub.deley(null, message);
+        } catch (Exception e) {
+          error.append(e.getMessage());
+        }
+        synchronized(error) {
+          error.notifyAll();
+        }
+      }
+    };
+
+    callThread.start();
+
+    final CountDownLatch latch = new CountDownLatch(1);
+    Thread shutdownThread = new Thread() {
+      public void run() {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        try {
+          server.shutdown();
+          server = null;
+          latch.countDown();
+        } catch (Throwable e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    shutdownThread.start();
+
+    assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS));
+
+    assertTrue(latch.getCount() == 0);
+
+    synchronized(error) {
+      error.wait(5 * 1000);
+    }
+
+    if(!error.toString().isEmpty()) {
+      fail(error.toString());
+    }
+  }
+
+  @Test
+  @SetupRpcConnection(setupRpcClient=false)
+  public void testUnresolvedAddress() throws Exception {
+    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    BlockingInterface stub = client.getStub();
+
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    EchoMessage response2 = stub.echo(null, message);
+    assertEquals(MESSAGE, response2.getMessage());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
new file mode 100644
index 0000000..0ca7563
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc.test.impl;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+
+public class DummyProtocolAsyncImpl implements Interface {
+  private static final Log LOG =
+      LogFactory.getLog(DummyProtocolAsyncImpl.class);
+  public boolean getNullCalled = false;
+  public boolean getErrorCalled = false;
+
+  @Override
+  public void sum(RpcController controller, SumRequest request,
+                  RpcCallback<SumResponse> done) {
+
+    SumResponse response = SumResponse.newBuilder().setResult(
+        request.getX1()+request.getX2()+request.getX3()+request.getX4()
+    ).build();
+    done.run(response);
+  }
+
+  @Override
+  public void echo(RpcController controller, EchoMessage request,
+                   RpcCallback<EchoMessage> done) {
+
+    done.run(request);
+  }
+
+  @Override
+  public void getError(RpcController controller, EchoMessage request,
+                       RpcCallback<EchoMessage> done) {
+    LOG.info("noCallback is called");
+    getErrorCalled = true;
+    controller.setFailed(request.getMessage());
+    done.run(request);
+  }
+
+  @Override
+  public void getNull(RpcController controller, EchoMessage request,
+                      RpcCallback<EchoMessage> done) {
+    getNullCalled = true;
+    LOG.info("noCallback is called");
+    done.run(null);
+  }
+
+  @Override
+  public void deley(RpcController controller, EchoMessage request,
+                    RpcCallback<EchoMessage> done) {
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+      LOG.error(e.getMessage());
+    }
+
+    done.run(request);
+  }
+
+  public void throwException(RpcController controller, EchoMessage request,
+                             RpcCallback<EchoMessage> done) {
+    done.run(request);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
new file mode 100644
index 0000000..8d4b597
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc.test.impl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
+import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
+import org.apache.tajo.rpc.test.TestProtos.SumRequest;
+import org.apache.tajo.rpc.test.TestProtos.SumResponse;
+
+public class DummyProtocolBlockingImpl implements BlockingInterface {
+  private static final Log LOG =
+      LogFactory.getLog(DummyProtocolBlockingImpl.class);
+  public boolean getNullCalled = false;
+  public boolean getErrorCalled = false;
+
+  @Override
+  public SumResponse sum(RpcController controller, SumRequest request)
+      throws ServiceException {
+    return SumResponse.newBuilder().setResult(
+        request.getX1()+request.getX2()+request.getX3()+request.getX4()
+    ).build();
+  }
+
+  @Override
+  public EchoMessage echo(RpcController controller, EchoMessage request)
+      throws ServiceException {
+    return EchoMessage.newBuilder().
+        setMessage(request.getMessage()).build();
+  }
+
+  @Override
+  public EchoMessage getError(RpcController controller, EchoMessage request)
+      throws ServiceException {
+    getErrorCalled = true;
+    controller.setFailed(request.getMessage());
+    return request;
+  }
+
+  @Override
+  public EchoMessage getNull(RpcController controller, EchoMessage request)
+      throws ServiceException {
+    getNullCalled = true;
+    LOG.info("noCallback is called");
+    return null;
+  }
+
+  @Override
+  public EchoMessage deley(RpcController controller, EchoMessage request)
+      throws ServiceException {
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+      //throw new ServiceException(e.getMessage(), e);
+    }
+
+    return request;
+  }
+
+  public EchoMessage throwException(RpcController controller, EchoMessage request)
+      throws ServiceException {
+    throw new ServiceException("Exception Test");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/pom.xml b/tajo-rpc/tajo-ws-rs/pom.xml
new file mode 100644
index 0000000..a87a67a
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/pom.xml
@@ -0,0 +1,218 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <groupId>org.apache.tajo</groupId>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <packaging>jar</packaging>
+  <artifactId>tajo-ws-rs</artifactId>
+  <name>Tajo RESTful Container</name>
+  <description>RESTful Container Implementation based on Netty</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+        </configuration>
+        <executions>
+          <execution>
+            <id>create-jar</id>
+            <phase>prepare-package</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-rpc-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.core</groupId>
+      <artifactId>jersey-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.core</groupId>
+      <artifactId>jersey-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>javax.ws.rs</groupId>
+      <artifactId>javax.ws.rs-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>dist</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <target>
+                    <echo file="${project.build.directory}/dist-layout-stitching.sh">
+                      run() {
+                      echo "\$ ${@}"
+                      "${@}"
+                      res=$?
+                      if [ $res != 0 ]; then
+                      echo
+                      echo "Failed!"
+                      echo
+                      exit $res
+                      fi
+                      }
+
+                      ROOT=`cd ${basedir}/..;pwd`
+                      echo
+                      echo "Current directory `pwd`"
+                      echo
+                      run rm -rf ${project.artifactId}-${project.version}
+                      run mkdir ${project.artifactId}-${project.version}
+                      run cd ${project.artifactId}-${project.version}
+                      run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+                      echo
+                      echo "Tajo RESTful Container dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+                      echo
+                    </echo>
+                    <exec executable="sh" dir="${project.build.directory}" failonerror="true">
+                      <arg line="./dist-layout-stitching.sh" />
+                    </exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java
new file mode 100644
index 0000000..a1ea72b
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+/**
+ * Default Channel Initializer for Netty Rest server.
+ */
+public class NettyRestChannelInitializer extends ChannelInitializer<Channel> {
+  
+  private ChannelHandler handler;
+  
+  public NettyRestChannelInitializer(ChannelHandler handler) {
+    this.handler = handler;
+  }
+
+  @Override
+  protected void initChannel(Channel channel) throws Exception {
+    ChannelPipeline pipeline = channel.pipeline();
+    
+    pipeline.addLast(new HttpServerCodec());
+    pipeline.addLast(new HttpObjectAggregator(1 << 16));
+    pipeline.addLast(new ChunkedWriteHandler());
+    pipeline.addLast(handler);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java
new file mode 100644
index 0000000..81d1eeb
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.Principal;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.*;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.SecurityContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.glassfish.hk2.api.ServiceLocator;
+import org.glassfish.jersey.internal.MapPropertiesDelegate;
+import org.glassfish.jersey.server.ApplicationHandler;
+import org.glassfish.jersey.server.ContainerException;
+import org.glassfish.jersey.server.ContainerRequest;
+import org.glassfish.jersey.server.ContainerResponse;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.server.internal.ConfigHelper;
+import org.glassfish.jersey.server.spi.Container;
+import org.glassfish.jersey.server.spi.ContainerLifecycleListener;
+import org.glassfish.jersey.server.spi.ContainerResponseWriter;
+
+/**
+ * Jersy Container implementation on Netty
+ */
+@Sharable
+public class NettyRestHandlerContainer extends ChannelDuplexHandler implements Container {
+
+  private static Log LOG = LogFactory.getLog(NettyRestHandlerContainer.class);
+
+  private String rootPath;
+
+  private ApplicationHandler applicationHandler;
+  private ContainerLifecycleListener lifecycleListener;
+
+  NettyRestHandlerContainer(Application application) {
+    this(new ApplicationHandler(application));
+  }
+
+  NettyRestHandlerContainer(Application application, ServiceLocator parentLocator) {
+    this(new ApplicationHandler(application, null, parentLocator));
+  }
+
+  NettyRestHandlerContainer(ApplicationHandler appHandler) {
+    applicationHandler = appHandler;
+    lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler);
+  }
+
+  @Override
+  public ResourceConfig getConfiguration() {
+    return applicationHandler.getConfiguration();
+  }
+
+  @Override
+  public void reload() {
+    reload(getConfiguration());
+  }
+
+  @Override
+  public void reload(ResourceConfig configuration) {
+    lifecycleListener.onShutdown(this);
+    applicationHandler = new ApplicationHandler(configuration);
+    lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler);
+    lifecycleListener.onReload(this);
+    lifecycleListener.onStartup(this);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("NettyRestHandlerContainer reloaded.");
+    }
+  }
+
+  public void setRootPath(String rootPath) {
+    String tempRootPath = rootPath;
+    if (tempRootPath == null || tempRootPath.isEmpty()) {
+      tempRootPath = "/";
+    } else if (tempRootPath.charAt(tempRootPath.length() - 1) != '/') {
+      tempRootPath += "/";
+    }
+    this.rootPath = tempRootPath;
+  }
+  
+  private URI getBaseUri(ChannelHandlerContext ctx, FullHttpRequest request) {
+    URI baseUri;
+    String scheme;
+    
+    if (ctx.pipeline().get(SslHandler.class) == null) {
+      scheme = "http";
+    } else {
+      scheme = "https";
+    }
+    
+    List<String> hosts = request.headers().getAll(HttpHeaders.Names.HOST);
+    try {
+      if (hosts != null && hosts.size() > 0) {
+        baseUri = new URI(scheme + "://" + hosts.get(0) + rootPath);
+      } else {
+        InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress();
+        baseUri = new URI(scheme, null, localAddress.getHostName(), localAddress.getPort(),
+                    rootPath, null, null);
+      }
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+    
+    return baseUri;
+  }
+
+  protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+    URI baseUri = getBaseUri(ctx, request);
+    URI requestUri = baseUri.resolve(request.getUri());
+    ByteBuf responseContent = PooledByteBufAllocator.DEFAULT.buffer();
+    FullHttpResponse response = 
+        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, responseContent);
+    
+    NettyRestResponseWriter responseWriter = new NettyRestResponseWriter(ctx, response);
+    ContainerRequest containerRequest = new ContainerRequest(baseUri, requestUri, 
+        request.getMethod().name(), getSecurityContext(), new MapPropertiesDelegate());
+    containerRequest.setEntityStream(new ByteBufInputStream(request.content()));
+    
+    HttpHeaders httpHeaders = request.headers();
+    for (String headerName: httpHeaders.names()) {
+      List<String> headerValues = httpHeaders.getAll(headerName);
+      containerRequest.headers(headerName, headerValues);
+    }
+    containerRequest.setWriter(responseWriter);
+    try {
+      applicationHandler.handle(containerRequest);
+    } finally {
+      responseWriter.releaseConnection();
+    }
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    boolean needRelease = true;
+    try {
+      if (msg instanceof FullHttpRequest) {
+        FullHttpRequest request = (FullHttpRequest) msg;
+        messageReceived(ctx, request);
+      } else {
+        needRelease = false;
+        ctx.fireChannelRead(msg);
+      }
+    } finally {
+      if (needRelease) {
+        ReferenceCountUtil.release(msg);
+      }
+    }
+  }
+
+  private SecurityContext getSecurityContext() {
+    return new SecurityContext() {
+
+      @Override
+      public boolean isUserInRole(String role) {
+        return false;
+      }
+
+      @Override
+      public boolean isSecure() {
+        return false;
+      }
+
+      @Override
+      public Principal getUserPrincipal() {
+        return null;
+      }
+
+      @Override
+      public String getAuthenticationScheme() {
+        return null;
+      }
+    };
+  }
+
+  /**
+   * Internal class for writing content on REST service.
+   */
+  static class NettyRestResponseWriter implements ContainerResponseWriter {
+
+    private final ChannelHandlerContext ctx;
+    private final FullHttpResponse response;
+    private final AtomicBoolean closed;
+
+    public NettyRestResponseWriter(ChannelHandlerContext ctx, FullHttpResponse response) {
+      this.ctx = ctx;
+      this.response = response;
+      this.closed = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void commit() {
+      if (closed.compareAndSet(false, true)) {
+        ctx.write(response);
+        sendLastHttpContent();
+      }
+    }
+
+    @Override
+    public boolean enableResponseBuffering() {
+      return false;
+    }
+
+    @Override
+    public void failure(Throwable error) {
+      try {
+        sendError(HttpResponseStatus.INTERNAL_SERVER_ERROR, error);
+      } finally {
+        if (ctx.channel().isActive()) {
+          ctx.close();
+        }
+      }
+    }
+    
+    private void sendError(HttpResponseStatus status, final Throwable error) {
+      FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+          Unpooled.copiedBuffer(error.getMessage(), CharsetUtil.UTF_8));
+      response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+      ChannelPromise promise = ctx.newPromise();
+      promise.addListener(new GenericFutureListener<ChannelFuture>() {
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (!future.isSuccess()) {
+            throw new ContainerException(error);
+          }
+        }
+      });
+
+      ctx.writeAndFlush(response, promise);
+    }
+
+    @Override
+    public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException {
+      throw new UnsupportedOperationException("setSuspendTimeout is not supported on this container.");
+    }
+
+    @Override
+    public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) {
+      throw new UnsupportedOperationException("suspend is not supported on this container.");
+    }
+
+    @Override
+    public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse context)
+        throws ContainerException {
+      MultivaluedMap<String, String> responseHeaders = context.getStringHeaders();
+      HttpHeaders nettyHeaders = response.headers();
+
+      for (Entry<String, List<String>> entry: responseHeaders.entrySet()) {
+        nettyHeaders.add(entry.getKey(), entry.getValue());
+      }
+
+      int status = context.getStatus();
+
+      response.setStatus(HttpResponseStatus.valueOf(status));
+      return new ByteBufOutputStream(response.content());
+    }
+
+    private void sendLastHttpContent() {
+      ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
+            .addListener(ChannelFutureListener.CLOSE);
+    }
+    
+    private void releaseConnection() {
+      if (closed.compareAndSet(false, true)) {
+        String warnMessage = "ResponseWriter did not be commited.";
+        LOG.warn(warnMessage);
+        failure(new IllegalStateException(warnMessage));
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java
new file mode 100644
index 0000000..7481cfb
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty;
+
+import io.netty.channel.ChannelHandler;
+
+import javax.ws.rs.ProcessingException;
+
+import org.glassfish.jersey.server.ApplicationHandler;
+import org.glassfish.jersey.server.spi.ContainerProvider;
+
+/**
+ * Container Provider for NettyRestHandlerContainer
+ */
+public final class NettyRestHandlerContainerProvider implements ContainerProvider {
+
+  @Override
+  public <T> T createContainer(Class<T> type, ApplicationHandler application) throws ProcessingException {
+    if (type != NettyRestHandlerContainer.class && 
+        (type == null || !ChannelHandler.class.isAssignableFrom(type))) {
+      return null;
+    }
+    return type.cast(new NettyRestHandlerContainer(application));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java
new file mode 100644
index 0000000..f7fe148
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty;
+
+import io.netty.channel.ChannelHandler;
+import java.net.InetSocketAddress;
+
+import org.apache.tajo.rpc.NettyServerBase;
+
+/**
+ * JAX-RS Http Server on Netty implementation.
+ */
+public class NettyRestServer extends NettyServerBase {
+  
+  private ChannelHandler handler;
+  private int workerCount;
+
+  public NettyRestServer(InetSocketAddress address, int workerCount) {
+    this("NettyRestService", address, workerCount);
+  }
+
+  public NettyRestServer(String serviceName, InetSocketAddress address, int workerCount) {
+    super(serviceName, address);
+    
+    this.workerCount = workerCount;
+  }
+
+  public ChannelHandler getHandler() {
+    return handler;
+  }
+
+  public void setHandler(ChannelHandler handler) {
+    this.handler = handler;
+  }
+
+  /**
+   * Bind desired port and start network service. Before starting network service, {@link NettyRestServer}
+   * will initialize its configuration.
+   * 
+   */
+  @Override
+  public void start() {
+    if (handler == null) {
+      throw new IllegalStateException("ChannelHandler is null.");
+    }
+    
+    super.init(new NettyRestChannelInitializer(handler), workerCount);
+    super.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java
new file mode 100644
index 0000000..5d2eea1
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.glassfish.hk2.api.ServiceLocator;
+import org.glassfish.jersey.server.ResourceConfig;
+
+/**
+ * Factory class for creating {@link NettyRestServer} instances
+ */
+public final class NettyRestServerFactory {
+
+  public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, int workerCount) {
+    return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration), workerCount, true);
+  }
+
+  public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, int workerCount,
+      boolean start) {
+    return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration), workerCount, start);
+  }
+
+  public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration,
+      ServiceLocator parentLocator, int workerCount) {
+    return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration, parentLocator), workerCount, true);
+  }
+
+  public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration,
+      ServiceLocator parentLocator, int workerCount, boolean start) {
+    return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration, parentLocator), workerCount, start);
+  }
+
+  /**
+   * Creates {@link NettyRestServer} instances with JAX-RS application.
+   * 
+   * @param uri
+   * @param handler
+   * @param start
+   * @return
+   */
+  private static NettyRestServer createNettyRestServer(URI uri, NettyRestHandlerContainer handler, int workerCount,
+      boolean start) {
+    if (uri == null) {
+      throw new IllegalArgumentException("uri is null.");
+    }
+
+    String schemeString = uri.getScheme();
+    if (!schemeString.equalsIgnoreCase("http") && !schemeString.equalsIgnoreCase("https")) {
+      throw new IllegalArgumentException("scheme of this uri (" + uri.toString() + ") should be http or https.");
+    }
+
+    int port = uri.getPort();
+    if (port == -1) {
+      throw new IllegalArgumentException("Port number should be provided.");
+    }
+
+    handler.setRootPath(uri.getPath());
+
+    InetSocketAddress bindAddress = new InetSocketAddress(port);
+    NettyRestServer nettyRestServer = new NettyRestServer("Tajo-REST", bindAddress, workerCount);
+
+    nettyRestServer.setHandler(handler);
+    nettyRestServer.addListener(new NettyRestServerListener(handler));
+
+    if (start) {
+      nettyRestServer.start();
+    }
+
+    return nettyRestServer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java
new file mode 100644
index 0000000..ecd5bb0
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty;
+
+import org.apache.tajo.rpc.RpcEventListener;
+import org.glassfish.jersey.server.ApplicationHandler;
+import org.glassfish.jersey.server.internal.ConfigHelper;
+import org.glassfish.jersey.server.spi.Container;
+import org.glassfish.jersey.server.spi.ContainerLifecycleListener;
+
+/**
+ * Event subscriber for netty rest service.
+ */
+public class NettyRestServerListener implements RpcEventListener {
+  
+  private Container container;
+  
+  public NettyRestServerListener(Container container) {
+    this.container = container;
+  }
+
+  @Override
+  public void onAfterInit(Object obj) {
+    
+  }
+
+  @Override
+  public void onAfterShutdown(Object obj) {
+    ApplicationHandler applicationHandler = new ApplicationHandler(container.getConfiguration());
+    ContainerLifecycleListener lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler);
+    lifecycleListener.onShutdown(container);
+  }
+
+  @Override
+  public void onAfterStart(Object obj) {
+    ApplicationHandler applicationHandler = new ApplicationHandler(container.getConfiguration());
+    ContainerLifecycleListener lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler);
+    lifecycleListener.onStartup(container);
+  }
+
+  @Override
+  public void onBeforeInit(Object obj) {
+    
+  }
+
+  @Override
+  public void onBeforeShutdown(Object obj) {
+    
+  }
+
+  @Override
+  public void onBeforeStart(Object obj) {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java
new file mode 100644
index 0000000..26086d4
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty.gson;
+
+import javax.ws.rs.core.Feature;
+import javax.ws.rs.core.FeatureContext;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.MessageBodyWriter;
+
+public class GsonFeature implements Feature {
+
+  @Override
+  public boolean configure(FeatureContext featureContext) {
+    featureContext.register(GsonReader.class, MessageBodyReader.class);
+    featureContext.register(GsonWriter.class, MessageBodyWriter.class);
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java
new file mode 100644
index 0000000..4d6e440
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import java.io.*;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+/**
+ * Custom message body reader with Gson feature.
+ */
+@Consumes(MediaType.APPLICATION_JSON)
+public class GsonReader<T> implements MessageBodyReader<T> {
+
+  @Override
+  public boolean isReadable(Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) {
+    return GsonUtil.isJsonType(mediaType);
+  }
+
+  @Override
+  public T readFrom(Class<T> aClass, Type type, Annotation[] annotations, MediaType mediaType,
+                    MultivaluedMap<String, String> multivaluedMap, InputStream inputStream)
+      throws IOException, WebApplicationException {
+    Gson gson = new GsonBuilder().create();
+    Reader reader = new BufferedReader(new InputStreamReader(inputStream));
+    return gson.fromJson(reader, type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java
new file mode 100644
index 0000000..f16cb96
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty.gson;
+
+import javax.ws.rs.core.MediaType;
+
+public class GsonUtil {
+
+  public static boolean isJsonType(MediaType mediaType) {
+    if (mediaType != null) {
+      String subType = mediaType.getSubtype();
+      return "json".equalsIgnoreCase(subType) || subType.endsWith("+json");
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java
new file mode 100644
index 0000000..d215611
--- /dev/null
+++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.netty.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import java.io.*;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+/**
+ * custom message body writer with Gson feature.
+ */
+@Produces(MediaType.APPLICATION_JSON)
+public class GsonWriter<T> implements MessageBodyWriter<T> {
+
+  @Override
+  public boolean isWriteable(Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) {
+    return GsonUtil.isJsonType(mediaType);
+  }
+
+  @Override
+  public long getSize(T t, Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) {
+    return 0;
+  }
+
+  @Override
+  public void writeTo(T t, Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType,
+                      MultivaluedMap<String, Object> multivaluedMap, OutputStream outputStream)
+      throws IOException, WebApplicationException {
+    Gson gson = new GsonBuilder().create();
+    Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));
+
+    gson.toJson(t, type, writer);
+    writer.flush();
+  }
+}