You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/18 18:25:48 UTC

[09/13] tajo git commit: TAJO-1337: Implements common modules to handle RESTful API

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
new file mode 100644
index 0000000..ed6b634
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class RpcChannelFactory {
+  private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
+  
+  private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
+
+  private static final Object lockObjectForLoopGroup = new Object();
+  private static AtomicInteger serverCount = new AtomicInteger(0);
+
+  public enum ClientChannelId {
+    CLIENT_DEFAULT,
+    FETCHER
+  }
+
+  private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
+      new ConcurrentHashMap<ClientChannelId, Integer>();
+  private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
+      new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
+
+  private RpcChannelFactory(){
+  }
+  
+  static {
+    Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
+
+    defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
+    defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
+  }
+
+  /**
+  * make this factory static thus all clients can share its thread pool.
+  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+  */
+  public static EventLoopGroup getSharedClientEventloopGroup() {
+    return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
+  }
+  
+  /**
+  * make this factory static thus all clients can share its thread pool.
+  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+  *
+  * @param workerNum The number of workers
+  */
+  public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
+    //shared woker and boss pool
+    return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
+  }
+
+  /**
+   * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
+   *
+   * @param clientId
+   * @param workerNum
+   * @return
+   */
+  public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
+    Queue<EventLoopGroup> eventLoopGroupQueue;
+    EventLoopGroup returnEventLoopGroup;
+
+    synchronized (lockObjectForLoopGroup) {
+      eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
+      if (eventLoopGroupQueue == null) {
+        eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
+      }
+
+      returnEventLoopGroup = eventLoopGroupQueue.poll();
+      if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
+        returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
+      }
+      eventLoopGroupQueue.add(returnEventLoopGroup);
+    }
+
+    return returnEventLoopGroup;
+  }
+
+  protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
+    return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
+  }
+
+  // Client must release the external resources
+  protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
+    int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
+    Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
+    eventLoopGroupPool.put(clientId, loopGroupQueue);
+
+    for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
+      loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
+    }
+
+    return loopGroupQueue;
+  }
+
+  protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
+    }
+
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
+
+    return new NioEventLoopGroup(workerNum, clientFactory);
+  }
+
+  // Client must release the external resources
+  public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
+    name = name + "-" + serverCount.incrementAndGet();
+    if(LOG.isInfoEnabled()){
+      LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
+    }
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
+    ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
+    
+    EventLoopGroup bossGroup =
+        new NioEventLoopGroup(1, bossFactory);
+    EventLoopGroup workerGroup = 
+        new NioEventLoopGroup(workerNum, workerFactory);
+    
+    return new ServerBootstrap().group(bossGroup, workerGroup);
+  }
+
+  public static void shutdownGracefully(){
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Shutdown Shared RPC Pool");
+    }
+
+    synchronized(lockObjectForLoopGroup) {
+      for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
+        for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
+          eventLoopGroup.shutdownGracefully();
+        }
+
+        eventLoopGroupQueue.clear();
+      }
+      eventLoopGroupPool.clear();
+    }
+  }
+  
+  static class CleanUpHandler extends Thread {
+
+    @Override
+    public void run() {
+      RpcChannelFactory.shutdownGracefully();
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java
new file mode 100644
index 0000000..4d72536
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+/**
+ * Event listener for netty code. Users can subscribe events by using this interface.
+ */
+public interface RpcEventListener {
+
+  /**
+   * Performs actions before start.
+   * @param obj Method caller
+   */
+  public void onBeforeStart(Object obj);
+  
+  /**
+   * Performs actions after start.
+   * @param obj Method caller
+   */
+  public void onAfterStart(Object obj);
+  
+  /**
+   * Performs actions before initialization.
+   * @param obj Method caller
+   */
+  public void onBeforeInit(Object obj);
+  
+  /**
+   * Performs actions after initialization.
+   * @param obj Method caller
+   */
+  public void onAfterInit(Object obj);
+  
+  /**
+   * Performs actions before shutdown.
+   * @param obj Method caller
+   */
+  public void onBeforeShutdown(Object obj);
+  
+  /**
+   * Performs actions after shutdown.
+   * @param obj Method caller
+   */
+  public void onAfterShutdown(Object obj);
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
new file mode 100644
index 0000000..152d426
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RpcUtils {
+
+  public static String normalizeInetSocketAddress(InetSocketAddress addr) {
+    return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+  }
+
+  /**
+   * Util method to build socket addr from either:
+   *   <host>
+   *   <host>:<port>
+   *   <fs>://<host>:<port>/<path>
+   */
+  public static InetSocketAddress createSocketAddr(String host, int port) {
+    return new InetSocketAddress(host, port);
+  }
+
+  /**
+   * Returns InetSocketAddress that a client can use to
+   * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+   * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+   * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+   *
+   * @param addr of a listener
+   * @return socket address that a client can use to connect to the server.
+   */
+  public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+    if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+      try {
+        addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+      } catch (UnknownHostException uhe) {
+        // shouldn't get here unless the host doesn't have a loopback iface
+        addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+      }
+    }
+    InetSocketAddress canonicalAddress =
+        new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort());
+    return canonicalAddress;
+  }
+
+  public static InetSocketAddress createUnresolved(String addr) {
+    String [] splitted = addr.split(":");
+    return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+  }
+
+  public static class Timer {
+    private long remaining;
+    private long prev;
+    public Timer(long timeout) {
+      this.remaining = timeout;
+      this.prev = System.currentTimeMillis();
+    }
+
+    public boolean isTimedOut() {
+      return remaining <= 0;
+    }
+
+    public void elapsed() {
+      long current = System.currentTimeMillis();
+      remaining -= (prev - current);
+      prev = current;
+    }
+
+    public void interval(long wait) {
+      if (wait <= 0 || isTimedOut()) {
+        return;
+      }
+      try {
+        Thread.sleep(Math.min(remaining, wait));
+      } catch (Exception ex) {
+        // ignore
+      }
+    }
+
+    public long remaining() {
+      return remaining;
+    }
+  }
+
+  public static class Scrutineer<T> {
+
+    private final AtomicReference<T> reference = new AtomicReference<T>();
+
+    T check(T ticket) {
+      T granted = reference.get();
+      for (;granted == null; granted = reference.get()) {
+        if (reference.compareAndSet(null, ticket)) {
+          return ticket;
+        }
+      }
+      return granted;
+    }
+
+    boolean clear(T granted) {
+      return reference.compareAndSet(granted, null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/pom.xml b/tajo-rpc/tajo-rpc-protobuf/pom.xml
new file mode 100644
index 0000000..1f67255
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/pom.xml
@@ -0,0 +1,274 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+    <groupId>org.apache.tajo</groupId>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <packaging>jar</packaging>
+  <artifactId>tajo-rpc-protobuf</artifactId>
+  <name>Tajo Protocol Buffer Rpc</name>
+  <description>RPC Server/Client Implementation based on Netty and Protocol Buffer</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+        </configuration>
+        <executions>
+          <execution>
+            <id>create-jar</id>
+            <phase>prepare-package</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/DummyProtos.proto</argument>
+                <argument>src/main/proto/RpcProtos.proto</argument>
+                <argument>src/main/proto/TestProtos.proto</argument>
+                <argument>src/main/proto/TestProtocol.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-rpc-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+			</url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>dist</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <target>
+                    <echo file="${project.build.directory}/dist-layout-stitching.sh">
+                      run() {
+                      echo "\$ ${@}"
+                      "${@}"
+                      res=$?
+                      if [ $res != 0 ]; then
+                      echo
+                      echo "Failed!"
+                      echo
+                      exit $res
+                      fi
+                      }
+
+                      ROOT=`cd ${basedir}/..;pwd`
+                      echo
+                      echo "Current directory `pwd`"
+                      echo
+                      run rm -rf ${project.artifactId}-${project.version}
+                      run mkdir ${project.artifactId}-${project.version}
+                      run cd ${project.artifactId}-${project.version}
+                      run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+                      echo
+                      echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+                      echo
+                    </echo>
+                    <exec executable="sh" dir="${project.build.directory}" failonerror="true">
+                      <arg line="./dist-layout-stitching.sh" />
+                    </exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
new file mode 100644
index 0000000..3d856ce
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.*;
+
+import io.netty.channel.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class AsyncRpcClient extends NettyClientBase {
+  private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
+
+  private final ConcurrentMap<Integer, ResponseCallback> requests =
+      new ConcurrentHashMap<Integer, ResponseCallback>();
+
+  private final Method stubMethod;
+  private final ProxyRpcChannel rpcChannel;
+  private final ClientChannelInboundHandler inboundHandler;
+
+  /**
+   * Intentionally make this method package-private, avoiding user directly
+   * new an instance through this constructor.
+   */
+  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+      throws ClassNotFoundException, NoSuchMethodException {
+    super(rpcConnectionKey, retries);
+    stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
+    rpcChannel = new ProxyRpcChannel();
+    inboundHandler = new ClientChannelInboundHandler();
+    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
+  }
+
+  @Override
+  public <T> T getStub() {
+    return getStub(stubMethod, rpcChannel);
+  }
+
+  protected void sendExceptions(String message) {
+    for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
+      ResponseCallback callback = callbackEntry.getValue();
+      Integer id = callbackEntry.getKey();
+
+      RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
+          .setErrorMessage(message)
+          .setId(id);
+
+      callback.run(responseBuilder.build());
+    }
+  }
+
+  @Override
+  public void close() {
+    sendExceptions("AsyncRpcClient terminates all the connections");
+
+    super.close();
+  }
+
+  private class ProxyRpcChannel implements RpcChannel {
+
+    public void callMethod(final MethodDescriptor method,
+                           final RpcController controller,
+                           final Message param,
+                           final Message responseType,
+                           RpcCallback<Message> done) {
+
+      int nextSeqId = sequence.getAndIncrement();
+
+      Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+      inboundHandler.registerCallback(nextSeqId,
+          new ResponseCallback(controller, responseType, done));
+
+      ChannelPromise channelPromise = getChannel().newPromise();
+      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (!future.isSuccess()) {
+            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
+          }
+        }
+      });
+      getChannel().writeAndFlush(rpcRequest, channelPromise);
+    }
+
+    private Message buildRequest(int seqId,
+                                 MethodDescriptor method,
+                                 Message param) {
+
+      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+          .setId(seqId)
+          .setMethodName(method.getName());
+
+      if (param != null) {
+          requestBuilder.setRequestMessage(param.toByteString());
+      }
+
+      return requestBuilder.build();
+    }
+  }
+
+  private class ResponseCallback implements RpcCallback<RpcResponse> {
+    private final RpcController controller;
+    private final Message responsePrototype;
+    private final RpcCallback<Message> callback;
+
+    public ResponseCallback(RpcController controller,
+                            Message responsePrototype,
+                            RpcCallback<Message> callback) {
+      this.controller = controller;
+      this.responsePrototype = responsePrototype;
+      this.callback = callback;
+    }
+
+    @Override
+    public void run(RpcResponse rpcResponse) {
+      // if hasErrorMessage is true, it means rpc-level errors.
+      // it does not call the callback function\
+      if (rpcResponse.hasErrorMessage()) {
+        if (controller != null) {
+          this.controller.setFailed(rpcResponse.getErrorMessage());
+        }
+        callback.run(null);
+      } else { // if rpc call succeed
+        try {
+          Message responseMessage;
+          if (!rpcResponse.hasResponseMessage()) {
+            responseMessage = null;
+          } else {
+            responseMessage = responsePrototype.newBuilderForType().mergeFrom(
+                rpcResponse.getResponseMessage()).build();
+          }
+
+          callback.run(responseMessage);
+
+        } catch (InvalidProtocolBufferException e) {
+          throw new RemoteException(getErrorMessage(""), e);
+        }
+      }
+    }
+  }
+
+  private String getErrorMessage(String message) {
+    return "Exception [" + protocol.getCanonicalName() +
+        "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
+        getChannel().remoteAddress()) + ")]: " + message;
+  }
+
+  @ChannelHandler.Sharable
+  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
+
+    void registerCallback(int seqId, ResponseCallback callback) {
+
+      if (requests.putIfAbsent(seqId, callback) != null) {
+        throw new RemoteException(
+            getErrorMessage("Duplicate Sequence Id "+ seqId));
+      }
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+      if (msg instanceof RpcResponse) {
+        try {
+          RpcResponse response = (RpcResponse) msg;
+          ResponseCallback callback = requests.remove(response.getId());
+
+          if (callback == null) {
+            LOG.warn("Dangling rpc call");
+          } else {
+            callback.run(response);
+          }
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+      LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause);
+
+      sendExceptions(cause.getMessage());
+      
+      if(LOG.isDebugEnabled()) {
+        LOG.error(cause.getMessage(), cause);
+      } else {
+        LOG.error("RPC Exception:" + cause.getMessage());
+      }
+      
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
new file mode 100644
index 0000000..3b5a747
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+
+import io.netty.channel.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class AsyncRpcServer extends NettyServerBase {
+  private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
+
+  private final Service service;
+  private final ChannelInitializer<Channel> initializer;
+
+  public AsyncRpcServer(final Class<?> protocol,
+                        final Object instance,
+                        final InetSocketAddress bindAddress,
+                        final int workerNum)
+      throws Exception {
+    super(protocol.getSimpleName(), bindAddress);
+
+    String serviceClassName = protocol.getName() + "$" +
+        protocol.getSimpleName() + "Service";
+    Class<?> serviceClass = Class.forName(serviceClassName);
+    Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
+    Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
+    this.service = (Service) method.invoke(null, instance);
+
+    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+    super.init(this.initializer, workerNum);
+  }
+
+  @ChannelHandler.Sharable
+  private class ServerHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.add(ctx.channel());
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+      }
+      super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.remove(ctx.channel());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+      }
+      super.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+      if (msg instanceof RpcRequest) {
+        try {
+          final RpcRequest request = (RpcRequest) msg;
+
+          String methodName = request.getMethodName();
+          MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+          if (methodDescriptor == null) {
+            throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+          }
+
+          Message paramProto = null;
+          if (request.hasRequestMessage()) {
+            try {
+              paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+                  .mergeFrom(request.getRequestMessage()).build();
+            } catch (Throwable t) {
+              throw new RemoteCallException(request.getId(), methodDescriptor, t);
+            }
+          }
+
+          final RpcController controller = new NettyRpcController();
+
+          RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() {
+
+            public void run(Message returnValue) {
+
+              RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+
+              if (returnValue != null) {
+                builder.setResponseMessage(returnValue.toByteString());
+              }
+
+              if (controller.failed()) {
+                builder.setErrorMessage(controller.errorText());
+              }
+
+              ctx.writeAndFlush(builder.build());
+            }
+          };
+
+          service.callMethod(methodDescriptor, controller, paramProto, callback);
+
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception{
+      if (cause instanceof RemoteCallException) {
+        RemoteCallException callException = (RemoteCallException) cause;
+        ctx.writeAndFlush(callException.getResponse());
+      } else {
+        LOG.error(cause.getMessage());
+      }
+      
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
+      }
+    }
+    
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
new file mode 100644
index 0000000..6a90330
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+
+import io.netty.channel.*;
+import io.netty.util.concurrent.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.Future;
+
+public class BlockingRpcClient extends NettyClientBase {
+  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+
+  private final Map<Integer, ProtoCallFuture> requests =
+      new ConcurrentHashMap<Integer, ProtoCallFuture>();
+
+  private final Method stubMethod;
+  private final ProxyRpcChannel rpcChannel;
+  private final ChannelInboundHandlerAdapter inboundHandler;
+
+  /**
+   * Intentionally make this method package-private, avoiding user directly
+   * new an instance through this constructor.
+   */
+  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+      throws ClassNotFoundException, NoSuchMethodException {
+    super(rpcConnectionKey, retries);
+    stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
+    rpcChannel = new ProxyRpcChannel();
+    inboundHandler = new ClientChannelInboundHandler();
+    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
+  }
+
+  @Override
+  public <T> T getStub() {
+    return getStub(stubMethod, rpcChannel);
+  }
+
+  @Override
+  public void close() {
+    for(ProtoCallFuture callback: requests.values()) {
+      callback.setFailed("BlockingRpcClient terminates all the connections",
+          new ServiceException("BlockingRpcClient terminates all the connections"));
+    }
+
+    super.close();
+  }
+
+  private class ProxyRpcChannel implements BlockingRpcChannel {
+
+    @Override
+    public Message callBlockingMethod(final MethodDescriptor method,
+                                      final RpcController controller,
+                                      final Message param,
+                                      final Message responsePrototype)
+        throws TajoServiceException {
+
+      int nextSeqId = sequence.getAndIncrement();
+
+      Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+      ProtoCallFuture callFuture =
+          new ProtoCallFuture(controller, responsePrototype);
+      requests.put(nextSeqId, callFuture);
+
+      ChannelPromise channelPromise = getChannel().newPromise();
+      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (!future.isSuccess()) {
+            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
+          }
+        }
+      });
+      getChannel().writeAndFlush(rpcRequest, channelPromise);
+
+      try {
+        return callFuture.get(60, TimeUnit.SECONDS);
+      } catch (Throwable t) {
+        if (t instanceof ExecutionException) {
+          Throwable cause = t.getCause();
+          if (cause != null && cause instanceof TajoServiceException) {
+            throw (TajoServiceException)cause;
+          }
+        }
+        throw new TajoServiceException(t.getMessage());
+      }
+    }
+
+    private Message buildRequest(int seqId,
+                                 MethodDescriptor method,
+                                 Message param) {
+      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+          .setId(seqId)
+          .setMethodName(method.getName());
+
+      if (param != null) {
+        requestBuilder.setRequestMessage(param.toByteString());
+      }
+
+      return requestBuilder.build();
+    }
+  }
+
+  private String getErrorMessage(String message) {
+    if(getChannel() != null) {
+      return protocol.getName() +
+          "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
+          getChannel().remoteAddress()) + "): " + message;
+    } else {
+      return "Exception " + message;
+    }
+  }
+
+  private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
+    if(getChannel() != null) {
+      return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
+          RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
+    } else {
+      return new TajoServiceException(response.getErrorMessage());
+    }
+  }
+
+  @ChannelHandler.Sharable
+  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+
+      if (msg instanceof RpcResponse) {
+        try {
+          RpcResponse rpcResponse = (RpcResponse) msg;
+          ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+
+          if (callback == null) {
+            LOG.warn("Dangling rpc call");
+          } else {
+            if (rpcResponse.hasErrorMessage()) {
+              callback.setFailed(rpcResponse.getErrorMessage(),
+                  makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
+              throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
+            } else {
+              Message responseMessage;
+
+              if (!rpcResponse.hasResponseMessage()) {
+                responseMessage = null;
+              } else {
+                responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
+                    .build();
+              }
+
+              callback.setResponse(responseMessage);
+            }
+          }
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+      for(ProtoCallFuture callback: requests.values()) {
+        callback.setFailed(cause.getMessage(), cause);
+      }
+      
+      if(LOG.isDebugEnabled()) {
+        LOG.error("" + cause.getMessage(), cause);
+      } else {
+        LOG.error("RPC Exception:" + cause.getMessage());
+      }
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
+      }
+    }
+  }
+
+ static class ProtoCallFuture implements Future<Message> {
+    private Semaphore sem = new Semaphore(0);
+    private Message response = null;
+    private Message returnType;
+
+    private RpcController controller;
+
+    private ExecutionException ee;
+
+    public ProtoCallFuture(RpcController controller, Message message) {
+      this.controller = controller;
+      this.returnType = message;
+    }
+
+    @Override
+    public boolean cancel(boolean arg0) {
+      return false;
+    }
+
+    @Override
+    public Message get() throws InterruptedException, ExecutionException {
+      sem.acquire();
+      if(ee != null) {
+        throw ee;
+      }
+      return response;
+    }
+
+    @Override
+    public Message get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      if(sem.tryAcquire(timeout, unit)) {
+        if (ee != null) {
+          throw ee;
+        }
+        return response;
+      } else {
+        throw new TimeoutException();
+      }
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return sem.availablePermits() > 0;
+    }
+
+    public void setResponse(Message response) {
+      this.response = response;
+      sem.release();
+    }
+
+    public void setFailed(String errorText, Throwable t) {
+      if(controller != null) {
+        this.controller.setFailed(errorText);
+      }
+      ee = new ExecutionException(errorText, t);
+      sem.release();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
new file mode 100644
index 0000000..0ce359f
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+
+import io.netty.channel.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class BlockingRpcServer extends NettyServerBase {
+  private static Log LOG = LogFactory.getLog(BlockingRpcServer.class);
+  private final BlockingService service;
+  private final ChannelInitializer<Channel> initializer;
+
+  public BlockingRpcServer(final Class<?> protocol,
+                           final Object instance,
+                           final InetSocketAddress bindAddress,
+                           final int workerNum)
+      throws Exception {
+
+    super(protocol.getSimpleName(), bindAddress);
+
+    String serviceClassName = protocol.getName() + "$" +
+        protocol.getSimpleName() + "Service";
+    Class<?> serviceClass = Class.forName(serviceClassName);
+    Class<?> interfaceClass = Class.forName(serviceClassName +
+        "$BlockingInterface");
+    Method method = serviceClass.getMethod(
+        "newReflectiveBlockingService", interfaceClass);
+
+    this.service = (BlockingService) method.invoke(null, instance);
+    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+
+    super.init(this.initializer, workerNum);
+  }
+
+  @ChannelHandler.Sharable
+  private class ServerHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.add(ctx.channel());
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+      }
+      super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.remove(ctx.channel());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+      }
+      super.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+
+      if (msg instanceof RpcRequest) {
+        try {
+          final RpcRequest request = (RpcRequest) msg;
+
+          String methodName = request.getMethodName();
+          MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+          if (methodDescriptor == null) {
+            throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+          }
+          Message paramProto = null;
+          if (request.hasRequestMessage()) {
+            try {
+              paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+                  .mergeFrom(request.getRequestMessage()).build();
+
+            } catch (Throwable t) {
+              throw new RemoteCallException(request.getId(), methodDescriptor, t);
+            }
+          }
+          Message returnValue;
+          RpcController controller = new NettyRpcController();
+
+          try {
+            returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
+          } catch (Throwable t) {
+            throw new RemoteCallException(request.getId(), methodDescriptor, t);
+          }
+
+          RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+
+          if (returnValue != null) {
+            builder.setResponseMessage(returnValue.toByteString());
+          }
+
+          if (controller.failed()) {
+            builder.setErrorMessage(controller.errorText());
+          }
+          ctx.writeAndFlush(builder.build());
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+      if (cause instanceof RemoteCallException) {
+        RemoteCallException callException = (RemoteCallException) cause;
+        ctx.writeAndFlush(callException.getResponse());
+      }
+      
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
+      }
+    }
+    
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
new file mode 100644
index 0000000..c4c3256
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class CallFuture<T> implements RpcCallback<T>, Future<T> {
+
+  private final Semaphore sem = new Semaphore(0);
+  private boolean done = false;
+  private T response;
+  private RpcController controller;
+
+  public CallFuture() {
+    controller = new DefaultRpcController();
+  }
+
+  public RpcController getController() {
+    return controller;
+  }
+
+  @Override
+  public void run(T t) {
+    this.response = t;
+    done = true;
+    sem.release();
+  }
+
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    controller.startCancel();
+    sem.release();
+    return controller.isCanceled();
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return controller.isCanceled();
+  }
+
+  @Override
+  public boolean isDone() {
+    return done;
+  }
+
+  @Override
+  public T get() throws InterruptedException {
+    sem.acquire();
+
+    return response;
+  }
+
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, TimeoutException {
+    if (sem.tryAcquire(timeout, unit)) {
+      return response;
+    } else {
+      throw new TimeoutException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
new file mode 100644
index 0000000..4ba19a5
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+public class DefaultRpcController implements RpcController {
+  private String errorText;
+  private boolean error;
+  private boolean canceled;
+
+  @Override
+  public void reset() {
+    errorText = "";
+    error = false;
+    canceled = false;
+  }
+
+  @Override
+  public boolean failed() {
+    return error;
+  }
+
+  @Override
+  public String errorText() {
+    return errorText;
+  }
+
+  @Override
+  public void startCancel() {
+    this.canceled = true;
+  }
+
+  @Override
+  public void setFailed(String s) {
+    this.errorText = s;
+    this.error = true;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return canceled;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
new file mode 100644
index 0000000..72278f2
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class NettyClientBase implements Closeable {
+  private static final Log LOG = LogFactory.getLog(NettyClientBase.class);
+  private static final int CONNECTION_TIMEOUT = 60000;  // 60 sec
+  private static final long PAUSE = 1000; // 1 sec
+
+  private final int numRetries;
+
+  private Bootstrap bootstrap;
+  private volatile ChannelFuture channelFuture;
+
+  protected final Class<?> protocol;
+  protected final AtomicInteger sequence = new AtomicInteger(0);
+
+  private final RpcConnectionKey key;
+  private final AtomicInteger counter = new AtomicInteger(0);   // reference counter
+
+  public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
+      throws ClassNotFoundException, NoSuchMethodException {
+    this.key = rpcConnectionKey;
+    this.protocol = rpcConnectionKey.protocolClass;
+    this.numRetries = numRetries;
+  }
+
+  // should be called from sub class
+  protected void init(ChannelInitializer<Channel> initializer) {
+    this.bootstrap = new Bootstrap();
+    this.bootstrap
+      .channel(NioSocketChannel.class)
+      .handler(initializer)
+      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      .option(ChannelOption.SO_REUSEADDR, true)
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT)
+      .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
+      .option(ChannelOption.TCP_NODELAY, true);
+  }
+
+  public RpcConnectionPool.RpcConnectionKey getKey() {
+    return key;
+  }
+
+  protected final Class<?> getServiceClass() throws ClassNotFoundException {
+    String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
+    return Class.forName(serviceClassName);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected final <T> T getStub(Method stubMethod, Object rpcChannel) {
+    try {
+      return (T) stubMethod.invoke(null, rpcChannel);
+    } catch (Exception e) {
+      throw new RemoteException(e.getMessage(), e);
+    }
+  }
+
+  public abstract <T> T getStub();
+
+  public boolean acquire(long timeout) {
+    if (!checkConnection(timeout)) {
+      return false;
+    }
+    counter.incrementAndGet();
+    return true;
+  }
+
+  public boolean release() {
+    return counter.decrementAndGet() == 0;
+  }
+
+  private boolean checkConnection(long timeout) {
+    if (isConnected()) {
+      return true;
+    }
+
+    InetSocketAddress addr = key.addr;
+    if (addr.isUnresolved()) {
+      addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+    }
+
+    return handleConnectionInternally(addr, timeout);
+  }
+
+  private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+    LOG.warn("Try to connect : " + address);
+    this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
+            .connect(address)
+            .addListener(listener);
+  }
+
+  // first attendant kicks connection
+  private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>();
+
+  private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) {
+    final CountDownLatch ticket = new CountDownLatch(1);
+    final CountDownLatch granted = connect.check(ticket);
+
+    // basically, it's double checked lock
+    if (ticket == granted && isConnected()) {
+      granted.countDown();
+      return true;
+    }
+
+    if (ticket == granted) {
+      connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
+    }
+
+    try {
+      granted.await(timeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      // ignore
+    }
+
+    boolean success = channelFuture.isSuccess();
+
+    if (granted.getCount() == 0) {
+      connect.clear(granted);
+    }
+
+    return success;
+  }
+
+  class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
+    private final AtomicInteger retryCount = new AtomicInteger();
+    private final InetSocketAddress address;
+    private final CountDownLatch latch;
+
+    RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) {
+      this.address = address;
+      this.latch = latch;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture channelFuture) throws Exception {
+      if (!channelFuture.isSuccess()) {
+        channelFuture.channel().close();
+
+        if (numRetries > retryCount.getAndIncrement()) {
+          final GenericFutureListener<ChannelFuture> currentListener = this;
+
+          RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() {
+            @Override
+            public void run() {
+              connectUsingNetty(address, currentListener);
+            }
+          }, PAUSE, TimeUnit.MILLISECONDS);
+
+          LOG.debug("Connecting to " + address + " has been failed. Retrying to connect.");
+        }
+        else {
+          latch.countDown();
+
+          LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
+        }
+      }
+      else {
+        latch.countDown();
+      }
+    }
+  }
+
+  public Channel getChannel() {
+    return channelFuture == null ? null : channelFuture.channel();
+  }
+
+  public boolean isConnected() {
+    Channel channel = getChannel();
+    return channel != null && channel.isOpen() && channel.isActive();
+  }
+
+  public SocketAddress getRemoteAddress() {
+    Channel channel = getChannel();
+    return channel == null ? null : channel.remoteAddress();
+  }
+
+  @Override
+  public void close() {
+    Channel channel = getChannel();
+    if (channel != null && channel.isOpen()) {
+      LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+      channel.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
new file mode 100644
index 0000000..b7f4537
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+public class NettyRpcController implements RpcController {
+  private String errorText;
+
+  @Override
+  public void reset() {
+    errorText = null;
+  }
+
+  @Override
+  public boolean failed() {
+    return errorText != null;
+  }
+
+  @Override
+  public String errorText() {
+    return errorText;
+  }
+
+  @Override
+  public void startCancel() {
+    // TODO - to be implemented
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFailed(String s) {
+    errorText = s;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    // TODO - to be implemented
+    return false;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
new file mode 100644
index 0000000..9b7f8ac
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+
+public class NullCallback implements RpcCallback<Object> {
+  private final static NullCallback instance;
+
+  static {
+    instance = new NullCallback();
+  }
+
+  public static RpcCallback get() {
+    return instance;
+  }
+
+  @Override
+  public void run(Object parameter) {
+    // do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
new file mode 100644
index 0000000..6a340dc
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+
+import com.google.protobuf.MessageLite;
+
+class ProtoChannelInitializer extends ChannelInitializer<Channel> {
+  private final MessageLite defaultInstance;
+  private final ChannelHandler handler;
+
+  public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
+    this.handler = handler;
+    this.defaultInstance = defaultInstance;
+  }
+
+  @Override
+  protected void initChannel(Channel channel) throws Exception {
+    ChannelPipeline pipeline = channel.pipeline();
+    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
+    pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
+    pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
+    pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+    pipeline.addLast("handler", handler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
new file mode 100644
index 0000000..52ef31a
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+
+public class RemoteCallException extends RemoteException {
+  private int seqId;
+  private String originExceptionClass;
+
+  public RemoteCallException(int seqId, MethodDescriptor methodDesc,
+                             Throwable t) {
+    super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t);
+    this.seqId = seqId;
+    if (t != null) {
+      originExceptionClass = t.getClass().getCanonicalName();
+    }
+  }
+
+  public RemoteCallException(int seqId, Throwable t) {
+    super(t);
+    this.seqId = seqId;
+    if (t != null) {
+      originExceptionClass = t.getClass().getCanonicalName();
+    }
+  }
+
+  public RpcResponse getResponse() {
+    RpcResponse.Builder builder = RpcResponse.newBuilder();
+    builder.setId(seqId);
+    if (getCause().getMessage() == null) {
+      builder.setErrorMessage(getCause().getClass().getName());
+    } else {
+      builder.setErrorMessage(getCause().getMessage());
+    }
+    builder.setErrorTrace(getStackTraceString(getCause()));
+    builder.setErrorClass(originExceptionClass);
+
+    return builder.build();
+  }
+
+  private static String getStackTraceString(Throwable aThrowable) {
+    final Writer result = new StringWriter();
+    final PrintWriter printWriter = new PrintWriter(result);
+    aThrowable.printStackTrace(printWriter);
+    return result.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java
new file mode 100644
index 0000000..30c110d
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+public class RemoteException extends RuntimeException {
+  public RemoteException() {
+    super();
+  }
+
+  public RemoteException(String message) {
+    super(message);
+  }
+
+  public RemoteException(Throwable t) {
+    super(t);
+  }
+
+  public RemoteException(String message, Throwable t) {
+    super(message, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
new file mode 100644
index 0000000..3c054ad
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+public class RetriesExhaustedException extends RuntimeException {
+  private static final long serialVersionUID = 1876775844L;
+
+  public RetriesExhaustedException(final String msg) {
+    super(msg);
+  }
+
+  public RetriesExhaustedException(final String msg, final IOException e) {
+    super(msg, e);
+  }
+
+  /**
+   * Datastructure that allows adding more info around Throwable incident.
+   */
+  public static class ThrowableWithExtraContext {
+    private final Throwable t;
+    private final long when;
+    private final String extras;
+
+    public ThrowableWithExtraContext(final Throwable t, final long when,
+        final String extras) {
+      this.t = t;
+      this.when = when;
+      this.extras = extras;
+    }
+ 
+    @Override
+    public String toString() {
+      return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
+    }
+  }
+
+  /**
+   * Create a new RetriesExhaustedException from the list of prior failures.
+   * @param callableVitals Details from the {@link ServerCallable} we were using
+   * when we got this exception.
+   * @param numTries The number of tries we made
+   * @param exceptions List of exceptions that failed before giving up
+   */
+  public RetriesExhaustedException(final String callableVitals, int numTries,
+      List<Throwable> exceptions) {
+    super(getMessage(callableVitals, numTries, exceptions));
+  }
+
+  /**
+   * Create a new RetriesExhaustedException from the list of prior failures.
+   * @param numTries
+   * @param exceptions List of exceptions that failed before giving up
+   */
+  public RetriesExhaustedException(final int numTries,
+      final List<Throwable> exceptions) {
+    super(getMessage(numTries, exceptions));
+  }
+
+  private static String getMessage(String callableVitals, int numTries,
+      List<Throwable> exceptions) {
+    StringBuilder buffer = new StringBuilder("Failed contacting ");
+    buffer.append(callableVitals);
+    buffer.append(" after ");
+    buffer.append(numTries + 1);
+    buffer.append(" attempts.\nExceptions:\n");
+    for (Throwable t : exceptions) {
+      buffer.append(t.toString());
+      buffer.append("\n");
+    }
+    return buffer.toString();
+  }
+
+  private static String getMessage(final int numTries,
+      final List<Throwable> exceptions) {
+    StringBuilder buffer = new StringBuilder("Failed after attempts=");
+    buffer.append(numTries + 1);
+    buffer.append(", exceptions:\n");
+    for (Throwable t : exceptions) {
+      buffer.append(t.toString());
+      buffer.append("\n");
+    }
+    return buffer.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
new file mode 100644
index 0000000..6d1f479
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.util.internal.logging.CommonsLoggerFactory;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RpcConnectionPool {
+  private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
+
+  private Map<RpcConnectionKey, NettyClientBase> connections =
+      new HashMap<RpcConnectionKey, NettyClientBase>();
+
+  private static RpcConnectionPool instance;
+  private final Object lockObject = new Object();
+
+  public final static int RPC_RETRIES = 3;
+
+  private RpcConnectionPool() {
+  }
+
+  public synchronized static RpcConnectionPool getPool() {
+    if(instance == null) {
+      InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
+      instance = new RpcConnectionPool();
+    }
+    return instance;
+  }
+
+  private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+    NettyClientBase client;
+    if(rpcConnectionKey.asyncMode) {
+      client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES);
+    } else {
+      client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES);
+    }
+    return client;
+  }
+
+  public static final long DEFAULT_TIMEOUT = 3000;
+  public static final long DEFAULT_INTERVAL = 500;
+
+  public NettyClientBase getConnection(InetSocketAddress addr,
+                                       Class<?> protocolClass, boolean asyncMode)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+    return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL);
+  }
+
+  public NettyClientBase getConnection(InetSocketAddress addr,
+      Class<?> protocolClass, boolean asyncMode, long timeout, long interval)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+    RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
+
+    RpcUtils.Timer timer = new RpcUtils.Timer(timeout);
+    for (; !timer.isTimedOut(); timer.elapsed()) {
+      NettyClientBase client;
+      synchronized (lockObject) {
+        client = connections.get(key);
+        if (client == null) {
+          connections.put(key, client = makeConnection(key));
+        }
+      }
+      if (client.acquire(timer.remaining())) {
+        return client;
+      }
+      timer.interval(interval);
+    }
+
+    throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec");
+  }
+
+  public void releaseConnection(NettyClientBase client) {
+    release(client, false);
+  }
+
+  public void closeConnection(NettyClientBase client) {
+    release(client, true);
+  }
+
+  private void release(NettyClientBase client, boolean close) {
+    if (client == null) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Close connection [" + client.getKey() + "]");
+    }
+    try {
+      if (returnToPool(client, close)) {
+        client.close();
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Current Connections [" + connections.size() + "]");
+      }
+    } catch (Exception e) {
+      LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
+    }
+  }
+
+  // return true if the connection should be closed
+  private boolean returnToPool(NettyClientBase client, boolean close) {
+    synchronized (lockObject) {
+      if (client.release() && (close || !client.isConnected())) {
+        connections.remove(client.getKey());
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void close() {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Pool Closed");
+    }
+
+    synchronized (lockObject) {
+      for (NettyClientBase eachClient : connections.values()) {
+        try {
+          eachClient.close();
+        } catch (Exception e) {
+          LOG.error("close client pool error", e);
+        }
+      }
+      connections.clear();
+    }
+  }
+
+  public void shutdown(){
+    close();
+    RpcChannelFactory.shutdownGracefully();
+  }
+
+  static class RpcConnectionKey {
+    final InetSocketAddress addr;
+    final Class<?> protocolClass;
+    final boolean asyncMode;
+
+    final String description;
+
+    public RpcConnectionKey(InetSocketAddress addr,
+                            Class<?> protocolClass, boolean asyncMode) {
+      this.addr = addr;
+      this.protocolClass = protocolClass;
+      this.asyncMode = asyncMode;
+      this.description = "["+ protocolClass + "] " + addr + "," + asyncMode;
+    }
+
+    @Override
+    public String toString() {
+      return description;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if(!(obj instanceof RpcConnectionKey)) {
+        return false;
+      }
+
+      return toString().equals(obj.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      return description.hashCode();
+    }
+  }
+}