You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/18 15:57:57 UTC
[3/5] tajo git commit: TAJO-1337: Implements common modules to handle
RESTful API
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
new file mode 100644
index 0000000..ed6b634
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class RpcChannelFactory {
+ private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
+
+ private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
+
+ private static final Object lockObjectForLoopGroup = new Object();
+ private static AtomicInteger serverCount = new AtomicInteger(0);
+
+ public enum ClientChannelId {
+ CLIENT_DEFAULT,
+ FETCHER
+ }
+
+ private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
+ new ConcurrentHashMap<ClientChannelId, Integer>();
+ private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
+ new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
+
+ private RpcChannelFactory(){
+ }
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
+
+ defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
+ defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
+ }
+
+ /**
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ */
+ public static EventLoopGroup getSharedClientEventloopGroup() {
+ return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
+ }
+
+ /**
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ *
+ * @param workerNum The number of workers
+ */
+ public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
+ //shared woker and boss pool
+ return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
+ }
+
+ /**
+ * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
+ *
+ * @param clientId
+ * @param workerNum
+ * @return
+ */
+ public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
+ Queue<EventLoopGroup> eventLoopGroupQueue;
+ EventLoopGroup returnEventLoopGroup;
+
+ synchronized (lockObjectForLoopGroup) {
+ eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
+ if (eventLoopGroupQueue == null) {
+ eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
+ }
+
+ returnEventLoopGroup = eventLoopGroupQueue.poll();
+ if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
+ returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
+ }
+ eventLoopGroupQueue.add(returnEventLoopGroup);
+ }
+
+ return returnEventLoopGroup;
+ }
+
+ protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
+ return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
+ }
+
+ // Client must release the external resources
+ protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
+ int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
+ Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
+ eventLoopGroupPool.put(clientId, loopGroupQueue);
+
+ for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
+ loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
+ }
+
+ return loopGroupQueue;
+ }
+
+ protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
+ }
+
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
+
+ return new NioEventLoopGroup(workerNum, clientFactory);
+ }
+
+ // Client must release the external resources
+ public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
+ name = name + "-" + serverCount.incrementAndGet();
+ if(LOG.isInfoEnabled()){
+ LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
+ }
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
+ ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
+
+ EventLoopGroup bossGroup =
+ new NioEventLoopGroup(1, bossFactory);
+ EventLoopGroup workerGroup =
+ new NioEventLoopGroup(workerNum, workerFactory);
+
+ return new ServerBootstrap().group(bossGroup, workerGroup);
+ }
+
+ public static void shutdownGracefully(){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Shutdown Shared RPC Pool");
+ }
+
+ synchronized(lockObjectForLoopGroup) {
+ for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
+ for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
+ eventLoopGroup.shutdownGracefully();
+ }
+
+ eventLoopGroupQueue.clear();
+ }
+ eventLoopGroupPool.clear();
+ }
+ }
+
+ static class CleanUpHandler extends Thread {
+
+ @Override
+ public void run() {
+ RpcChannelFactory.shutdownGracefully();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java
new file mode 100644
index 0000000..4d72536
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+/**
+ * Event listener for netty code. Users can subscribe events by using this interface.
+ */
+public interface RpcEventListener {
+
+ /**
+ * Performs actions before start.
+ * @param obj Method caller
+ */
+ public void onBeforeStart(Object obj);
+
+ /**
+ * Performs actions after start.
+ * @param obj Method caller
+ */
+ public void onAfterStart(Object obj);
+
+ /**
+ * Performs actions before initialization.
+ * @param obj Method caller
+ */
+ public void onBeforeInit(Object obj);
+
+ /**
+ * Performs actions after initialization.
+ * @param obj Method caller
+ */
+ public void onAfterInit(Object obj);
+
+ /**
+ * Performs actions before shutdown.
+ * @param obj Method caller
+ */
+ public void onBeforeShutdown(Object obj);
+
+ /**
+ * Performs actions after shutdown.
+ * @param obj Method caller
+ */
+ public void onAfterShutdown(Object obj);
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
new file mode 100644
index 0000000..152d426
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RpcUtils {
+
+ public static String normalizeInetSocketAddress(InetSocketAddress addr) {
+ return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ }
+
+ /**
+ * Util method to build socket addr from either:
+ * <host>
+ * <host>:<port>
+ * <fs>://<host>:<port>/<path>
+ */
+ public static InetSocketAddress createSocketAddr(String host, int port) {
+ return new InetSocketAddress(host, port);
+ }
+
+ /**
+ * Returns InetSocketAddress that a client can use to
+ * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+ * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+ * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+ *
+ * @param addr of a listener
+ * @return socket address that a client can use to connect to the server.
+ */
+ public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+ if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+ try {
+ addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+ } catch (UnknownHostException uhe) {
+ // shouldn't get here unless the host doesn't have a loopback iface
+ addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+ }
+ }
+ InetSocketAddress canonicalAddress =
+ new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort());
+ return canonicalAddress;
+ }
+
+ public static InetSocketAddress createUnresolved(String addr) {
+ String [] splitted = addr.split(":");
+ return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+ }
+
+ public static class Timer {
+ private long remaining;
+ private long prev;
+ public Timer(long timeout) {
+ this.remaining = timeout;
+ this.prev = System.currentTimeMillis();
+ }
+
+ public boolean isTimedOut() {
+ return remaining <= 0;
+ }
+
+ public void elapsed() {
+ long current = System.currentTimeMillis();
+ remaining -= (prev - current);
+ prev = current;
+ }
+
+ public void interval(long wait) {
+ if (wait <= 0 || isTimedOut()) {
+ return;
+ }
+ try {
+ Thread.sleep(Math.min(remaining, wait));
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ public long remaining() {
+ return remaining;
+ }
+ }
+
+ public static class Scrutineer<T> {
+
+ private final AtomicReference<T> reference = new AtomicReference<T>();
+
+ T check(T ticket) {
+ T granted = reference.get();
+ for (;granted == null; granted = reference.get()) {
+ if (reference.compareAndSet(null, ticket)) {
+ return ticket;
+ }
+ }
+ return granted;
+ }
+
+ boolean clear(T granted) {
+ return reference.compareAndSet(granted, null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/pom.xml b/tajo-rpc/tajo-rpc-protobuf/pom.xml
new file mode 100644
index 0000000..1f67255
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/pom.xml
@@ -0,0 +1,274 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <groupId>org.apache.tajo</groupId>
+ <relativePath>../../tajo-project</relativePath>
+ </parent>
+ <packaging>jar</packaging>
+ <artifactId>tajo-rpc-protobuf</artifactId>
+ <name>Tajo Protocol Buffer Rpc</name>
+ <description>RPC Server/Client Implementation based on Netty and Protocol Buffer</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ </configuration>
+ <executions>
+ <execution>
+ <id>create-jar</id>
+ <phase>prepare-package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-protobuf-generated-sources-directory</id>
+ <phase>initialize</phase>
+ <configuration>
+ <target>
+ <mkdir dir="target/generated-sources/proto" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <executable>protoc</executable>
+ <arguments>
+ <argument>-Isrc/main/proto/</argument>
+ <argument>--java_out=target/generated-sources/proto</argument>
+ <argument>src/main/proto/DummyProtos.proto</argument>
+ <argument>src/main/proto/RpcProtos.proto</argument>
+ <argument>src/main/proto/TestProtos.proto</argument>
+ <argument>src/main/proto/TestProtocol.proto</argument>
+ </arguments>
+ </configuration>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/proto</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-rpc-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <repositories>
+ <repository>
+ <id>repository.jboss.org</id>
+ <url>https://repository.jboss.org/nexus/content/repositories/releases/
+ </url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>dist</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>dist</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <echo file="${project.build.directory}/dist-layout-stitching.sh">
+ run() {
+ echo "\$ ${@}"
+ "${@}"
+ res=$?
+ if [ $res != 0 ]; then
+ echo
+ echo "Failed!"
+ echo
+ exit $res
+ fi
+ }
+
+ ROOT=`cd ${basedir}/..;pwd`
+ echo
+ echo "Current directory `pwd`"
+ echo
+ run rm -rf ${project.artifactId}-${project.version}
+ run mkdir ${project.artifactId}-${project.version}
+ run cd ${project.artifactId}-${project.version}
+ run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+ echo
+ echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+ echo
+ </echo>
+ <exec executable="sh" dir="${project.build.directory}" failonerror="true">
+ <arg line="./dist-layout-stitching.sh" />
+ </exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ </plugin>
+ </plugins>
+ </reporting>
+
+</project>
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
new file mode 100644
index 0000000..3d856ce
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.*;
+
+import io.netty.channel.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class AsyncRpcClient extends NettyClientBase {
+ private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
+
+ private final ConcurrentMap<Integer, ResponseCallback> requests =
+ new ConcurrentHashMap<Integer, ResponseCallback>();
+
+ private final Method stubMethod;
+ private final ProxyRpcChannel rpcChannel;
+ private final ClientChannelInboundHandler inboundHandler;
+
+ /**
+ * Intentionally make this method package-private, avoiding user directly
+ * new an instance through this constructor.
+ */
+ AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+ throws ClassNotFoundException, NoSuchMethodException {
+ super(rpcConnectionKey, retries);
+ stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
+ rpcChannel = new ProxyRpcChannel();
+ inboundHandler = new ClientChannelInboundHandler();
+ init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
+ }
+
+ @Override
+ public <T> T getStub() {
+ return getStub(stubMethod, rpcChannel);
+ }
+
+ protected void sendExceptions(String message) {
+ for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
+ ResponseCallback callback = callbackEntry.getValue();
+ Integer id = callbackEntry.getKey();
+
+ RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
+ .setErrorMessage(message)
+ .setId(id);
+
+ callback.run(responseBuilder.build());
+ }
+ }
+
+ @Override
+ public void close() {
+ sendExceptions("AsyncRpcClient terminates all the connections");
+
+ super.close();
+ }
+
+ private class ProxyRpcChannel implements RpcChannel {
+
+ public void callMethod(final MethodDescriptor method,
+ final RpcController controller,
+ final Message param,
+ final Message responseType,
+ RpcCallback<Message> done) {
+
+ int nextSeqId = sequence.getAndIncrement();
+
+ Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+ inboundHandler.registerCallback(nextSeqId,
+ new ResponseCallback(controller, responseType, done));
+
+ ChannelPromise channelPromise = getChannel().newPromise();
+ channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
+ }
+ }
+ });
+ getChannel().writeAndFlush(rpcRequest, channelPromise);
+ }
+
+ private Message buildRequest(int seqId,
+ MethodDescriptor method,
+ Message param) {
+
+ RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+ .setId(seqId)
+ .setMethodName(method.getName());
+
+ if (param != null) {
+ requestBuilder.setRequestMessage(param.toByteString());
+ }
+
+ return requestBuilder.build();
+ }
+ }
+
+ private class ResponseCallback implements RpcCallback<RpcResponse> {
+ private final RpcController controller;
+ private final Message responsePrototype;
+ private final RpcCallback<Message> callback;
+
+ public ResponseCallback(RpcController controller,
+ Message responsePrototype,
+ RpcCallback<Message> callback) {
+ this.controller = controller;
+ this.responsePrototype = responsePrototype;
+ this.callback = callback;
+ }
+
+ @Override
+ public void run(RpcResponse rpcResponse) {
+ // if hasErrorMessage is true, it means rpc-level errors.
+ // it does not call the callback function\
+ if (rpcResponse.hasErrorMessage()) {
+ if (controller != null) {
+ this.controller.setFailed(rpcResponse.getErrorMessage());
+ }
+ callback.run(null);
+ } else { // if rpc call succeed
+ try {
+ Message responseMessage;
+ if (!rpcResponse.hasResponseMessage()) {
+ responseMessage = null;
+ } else {
+ responseMessage = responsePrototype.newBuilderForType().mergeFrom(
+ rpcResponse.getResponseMessage()).build();
+ }
+
+ callback.run(responseMessage);
+
+ } catch (InvalidProtocolBufferException e) {
+ throw new RemoteException(getErrorMessage(""), e);
+ }
+ }
+ }
+ }
+
+ private String getErrorMessage(String message) {
+ return "Exception [" + protocol.getCanonicalName() +
+ "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
+ getChannel().remoteAddress()) + ")]: " + message;
+ }
+
+ @ChannelHandler.Sharable
+ private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
+
+ void registerCallback(int seqId, ResponseCallback callback) {
+
+ if (requests.putIfAbsent(seqId, callback) != null) {
+ throw new RemoteException(
+ getErrorMessage("Duplicate Sequence Id "+ seqId));
+ }
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ if (msg instanceof RpcResponse) {
+ try {
+ RpcResponse response = (RpcResponse) msg;
+ ResponseCallback callback = requests.remove(response.getId());
+
+ if (callback == null) {
+ LOG.warn("Dangling rpc call");
+ } else {
+ callback.run(response);
+ }
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause);
+
+ sendExceptions(cause.getMessage());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.error(cause.getMessage(), cause);
+ } else {
+ LOG.error("RPC Exception:" + cause.getMessage());
+ }
+
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
new file mode 100644
index 0000000..3b5a747
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+
+import io.netty.channel.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class AsyncRpcServer extends NettyServerBase {
+ private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
+
+ private final Service service;
+ private final ChannelInitializer<Channel> initializer;
+
+ public AsyncRpcServer(final Class<?> protocol,
+ final Object instance,
+ final InetSocketAddress bindAddress,
+ final int workerNum)
+ throws Exception {
+ super(protocol.getSimpleName(), bindAddress);
+
+ String serviceClassName = protocol.getName() + "$" +
+ protocol.getSimpleName() + "Service";
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
+ Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
+ this.service = (Service) method.invoke(null, instance);
+
+ this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+ super.init(this.initializer, workerNum);
+ }
+
+ @ChannelHandler.Sharable
+ private class ServerHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.add(ctx.channel());
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+ }
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.remove(ctx.channel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+ }
+ super.channelUnregistered(ctx);
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ if (msg instanceof RpcRequest) {
+ try {
+ final RpcRequest request = (RpcRequest) msg;
+
+ String methodName = request.getMethodName();
+ MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+ if (methodDescriptor == null) {
+ throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+ }
+
+ Message paramProto = null;
+ if (request.hasRequestMessage()) {
+ try {
+ paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+ .mergeFrom(request.getRequestMessage()).build();
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+ }
+
+ final RpcController controller = new NettyRpcController();
+
+ RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() {
+
+ public void run(Message returnValue) {
+
+ RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+
+ if (returnValue != null) {
+ builder.setResponseMessage(returnValue.toByteString());
+ }
+
+ if (controller.failed()) {
+ builder.setErrorMessage(controller.errorText());
+ }
+
+ ctx.writeAndFlush(builder.build());
+ }
+ };
+
+ service.callMethod(methodDescriptor, controller, paramProto, callback);
+
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception{
+ if (cause instanceof RemoteCallException) {
+ RemoteCallException callException = (RemoteCallException) cause;
+ ctx.writeAndFlush(callException.getResponse());
+ } else {
+ LOG.error(cause.getMessage());
+ }
+
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
new file mode 100644
index 0000000..6a90330
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+
+import io.netty.channel.*;
+import io.netty.util.concurrent.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.Future;
+
+public class BlockingRpcClient extends NettyClientBase {
+ private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+
+ private final Map<Integer, ProtoCallFuture> requests =
+ new ConcurrentHashMap<Integer, ProtoCallFuture>();
+
+ private final Method stubMethod;
+ private final ProxyRpcChannel rpcChannel;
+ private final ChannelInboundHandlerAdapter inboundHandler;
+
+ /**
+ * Intentionally make this method package-private, avoiding user directly
+ * new an instance through this constructor.
+ */
+ BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+ throws ClassNotFoundException, NoSuchMethodException {
+ super(rpcConnectionKey, retries);
+ stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
+ rpcChannel = new ProxyRpcChannel();
+ inboundHandler = new ClientChannelInboundHandler();
+ init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
+ }
+
+ @Override
+ public <T> T getStub() {
+ return getStub(stubMethod, rpcChannel);
+ }
+
+ @Override
+ public void close() {
+ for(ProtoCallFuture callback: requests.values()) {
+ callback.setFailed("BlockingRpcClient terminates all the connections",
+ new ServiceException("BlockingRpcClient terminates all the connections"));
+ }
+
+ super.close();
+ }
+
+ private class ProxyRpcChannel implements BlockingRpcChannel {
+
+ @Override
+ public Message callBlockingMethod(final MethodDescriptor method,
+ final RpcController controller,
+ final Message param,
+ final Message responsePrototype)
+ throws TajoServiceException {
+
+ int nextSeqId = sequence.getAndIncrement();
+
+ Message rpcRequest = buildRequest(nextSeqId, method, param);
+
+ ProtoCallFuture callFuture =
+ new ProtoCallFuture(controller, responsePrototype);
+ requests.put(nextSeqId, callFuture);
+
+ ChannelPromise channelPromise = getChannel().newPromise();
+ channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
+ }
+ }
+ });
+ getChannel().writeAndFlush(rpcRequest, channelPromise);
+
+ try {
+ return callFuture.get(60, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ if (t instanceof ExecutionException) {
+ Throwable cause = t.getCause();
+ if (cause != null && cause instanceof TajoServiceException) {
+ throw (TajoServiceException)cause;
+ }
+ }
+ throw new TajoServiceException(t.getMessage());
+ }
+ }
+
+ private Message buildRequest(int seqId,
+ MethodDescriptor method,
+ Message param) {
+ RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
+ .setId(seqId)
+ .setMethodName(method.getName());
+
+ if (param != null) {
+ requestBuilder.setRequestMessage(param.toByteString());
+ }
+
+ return requestBuilder.build();
+ }
+ }
+
+ private String getErrorMessage(String message) {
+ if(getChannel() != null) {
+ return protocol.getName() +
+ "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
+ getChannel().remoteAddress()) + "): " + message;
+ } else {
+ return "Exception " + message;
+ }
+ }
+
+ private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
+ if(getChannel() != null) {
+ return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
+ RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
+ } else {
+ return new TajoServiceException(response.getErrorMessage());
+ }
+ }
+
+ @ChannelHandler.Sharable
+ private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+
+ if (msg instanceof RpcResponse) {
+ try {
+ RpcResponse rpcResponse = (RpcResponse) msg;
+ ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+
+ if (callback == null) {
+ LOG.warn("Dangling rpc call");
+ } else {
+ if (rpcResponse.hasErrorMessage()) {
+ callback.setFailed(rpcResponse.getErrorMessage(),
+ makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
+ throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
+ } else {
+ Message responseMessage;
+
+ if (!rpcResponse.hasResponseMessage()) {
+ responseMessage = null;
+ } else {
+ responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
+ .build();
+ }
+
+ callback.setResponse(responseMessage);
+ }
+ }
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ for(ProtoCallFuture callback: requests.values()) {
+ callback.setFailed(cause.getMessage(), cause);
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.error("" + cause.getMessage(), cause);
+ } else {
+ LOG.error("RPC Exception:" + cause.getMessage());
+ }
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
+ }
+ }
+ }
+
+ static class ProtoCallFuture implements Future<Message> {
+ private Semaphore sem = new Semaphore(0);
+ private Message response = null;
+ private Message returnType;
+
+ private RpcController controller;
+
+ private ExecutionException ee;
+
+ public ProtoCallFuture(RpcController controller, Message message) {
+ this.controller = controller;
+ this.returnType = message;
+ }
+
+ @Override
+ public boolean cancel(boolean arg0) {
+ return false;
+ }
+
+ @Override
+ public Message get() throws InterruptedException, ExecutionException {
+ sem.acquire();
+ if(ee != null) {
+ throw ee;
+ }
+ return response;
+ }
+
+ @Override
+ public Message get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if(sem.tryAcquire(timeout, unit)) {
+ if (ee != null) {
+ throw ee;
+ }
+ return response;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return sem.availablePermits() > 0;
+ }
+
+ public void setResponse(Message response) {
+ this.response = response;
+ sem.release();
+ }
+
+ public void setFailed(String errorText, Throwable t) {
+ if(controller != null) {
+ this.controller.setFailed(errorText);
+ }
+ ee = new ExecutionException(errorText, t);
+ sem.release();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
new file mode 100644
index 0000000..0ce359f
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+
+import io.netty.channel.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcProtos.RpcRequest;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import io.netty.util.ReferenceCountUtil;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+public class BlockingRpcServer extends NettyServerBase {
+ private static Log LOG = LogFactory.getLog(BlockingRpcServer.class);
+ private final BlockingService service;
+ private final ChannelInitializer<Channel> initializer;
+
+ public BlockingRpcServer(final Class<?> protocol,
+ final Object instance,
+ final InetSocketAddress bindAddress,
+ final int workerNum)
+ throws Exception {
+
+ super(protocol.getSimpleName(), bindAddress);
+
+ String serviceClassName = protocol.getName() + "$" +
+ protocol.getSimpleName() + "Service";
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ Class<?> interfaceClass = Class.forName(serviceClassName +
+ "$BlockingInterface");
+ Method method = serviceClass.getMethod(
+ "newReflectiveBlockingService", interfaceClass);
+
+ this.service = (BlockingService) method.invoke(null, instance);
+ this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+
+ super.init(this.initializer, workerNum);
+ }
+
+ @ChannelHandler.Sharable
+ private class ServerHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.add(ctx.channel());
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+ }
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.remove(ctx.channel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+ }
+ super.channelUnregistered(ctx);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+
+ if (msg instanceof RpcRequest) {
+ try {
+ final RpcRequest request = (RpcRequest) msg;
+
+ String methodName = request.getMethodName();
+ MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+ if (methodDescriptor == null) {
+ throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+ }
+ Message paramProto = null;
+ if (request.hasRequestMessage()) {
+ try {
+ paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+ .mergeFrom(request.getRequestMessage()).build();
+
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+ }
+ Message returnValue;
+ RpcController controller = new NettyRpcController();
+
+ try {
+ returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+
+ RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+
+ if (returnValue != null) {
+ builder.setResponseMessage(returnValue.toByteString());
+ }
+
+ if (controller.failed()) {
+ builder.setErrorMessage(controller.errorText());
+ }
+ ctx.writeAndFlush(builder.build());
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof RemoteCallException) {
+ RemoteCallException callException = (RemoteCallException) cause;
+ ctx.writeAndFlush(callException.getResponse());
+ }
+
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
new file mode 100644
index 0000000..c4c3256
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class CallFuture<T> implements RpcCallback<T>, Future<T> {
+
+ private final Semaphore sem = new Semaphore(0);
+ private boolean done = false;
+ private T response;
+ private RpcController controller;
+
+ public CallFuture() {
+ controller = new DefaultRpcController();
+ }
+
+ public RpcController getController() {
+ return controller;
+ }
+
+ @Override
+ public void run(T t) {
+ this.response = t;
+ done = true;
+ sem.release();
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ controller.startCancel();
+ sem.release();
+ return controller.isCanceled();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return controller.isCanceled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public T get() throws InterruptedException {
+ sem.acquire();
+
+ return response;
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ if (sem.tryAcquire(timeout, unit)) {
+ return response;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
new file mode 100644
index 0000000..4ba19a5
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+public class DefaultRpcController implements RpcController {
+ private String errorText;
+ private boolean error;
+ private boolean canceled;
+
+ @Override
+ public void reset() {
+ errorText = "";
+ error = false;
+ canceled = false;
+ }
+
+ @Override
+ public boolean failed() {
+ return error;
+ }
+
+ @Override
+ public String errorText() {
+ return errorText;
+ }
+
+ @Override
+ public void startCancel() {
+ this.canceled = true;
+ }
+
+ @Override
+ public void setFailed(String s) {
+ this.errorText = s;
+ this.error = true;
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return canceled;
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
new file mode 100644
index 0000000..72278f2
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class NettyClientBase implements Closeable {
+ private static final Log LOG = LogFactory.getLog(NettyClientBase.class);
+ private static final int CONNECTION_TIMEOUT = 60000; // 60 sec
+ private static final long PAUSE = 1000; // 1 sec
+
+ private final int numRetries;
+
+ private Bootstrap bootstrap;
+ private volatile ChannelFuture channelFuture;
+
+ protected final Class<?> protocol;
+ protected final AtomicInteger sequence = new AtomicInteger(0);
+
+ private final RpcConnectionKey key;
+ private final AtomicInteger counter = new AtomicInteger(0); // reference counter
+
+ public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
+ throws ClassNotFoundException, NoSuchMethodException {
+ this.key = rpcConnectionKey;
+ this.protocol = rpcConnectionKey.protocolClass;
+ this.numRetries = numRetries;
+ }
+
+ // should be called from sub class
+ protected void init(ChannelInitializer<Channel> initializer) {
+ this.bootstrap = new Bootstrap();
+ this.bootstrap
+ .channel(NioSocketChannel.class)
+ .handler(initializer)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT)
+ .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
+ .option(ChannelOption.TCP_NODELAY, true);
+ }
+
+ public RpcConnectionPool.RpcConnectionKey getKey() {
+ return key;
+ }
+
+ protected final Class<?> getServiceClass() throws ClassNotFoundException {
+ String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
+ return Class.forName(serviceClassName);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected final <T> T getStub(Method stubMethod, Object rpcChannel) {
+ try {
+ return (T) stubMethod.invoke(null, rpcChannel);
+ } catch (Exception e) {
+ throw new RemoteException(e.getMessage(), e);
+ }
+ }
+
+ public abstract <T> T getStub();
+
+ public boolean acquire(long timeout) {
+ if (!checkConnection(timeout)) {
+ return false;
+ }
+ counter.incrementAndGet();
+ return true;
+ }
+
+ public boolean release() {
+ return counter.decrementAndGet() == 0;
+ }
+
+ private boolean checkConnection(long timeout) {
+ if (isConnected()) {
+ return true;
+ }
+
+ InetSocketAddress addr = key.addr;
+ if (addr.isUnresolved()) {
+ addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+ }
+
+ return handleConnectionInternally(addr, timeout);
+ }
+
+ private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+ LOG.warn("Try to connect : " + address);
+ this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
+ .connect(address)
+ .addListener(listener);
+ }
+
+ // first attendant kicks connection
+ private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>();
+
+ private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) {
+ final CountDownLatch ticket = new CountDownLatch(1);
+ final CountDownLatch granted = connect.check(ticket);
+
+ // basically, it's double checked lock
+ if (ticket == granted && isConnected()) {
+ granted.countDown();
+ return true;
+ }
+
+ if (ticket == granted) {
+ connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
+ }
+
+ try {
+ granted.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ boolean success = channelFuture.isSuccess();
+
+ if (granted.getCount() == 0) {
+ connect.clear(granted);
+ }
+
+ return success;
+ }
+
+ class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
+ private final AtomicInteger retryCount = new AtomicInteger();
+ private final InetSocketAddress address;
+ private final CountDownLatch latch;
+
+ RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) {
+ this.address = address;
+ this.latch = latch;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ if (!channelFuture.isSuccess()) {
+ channelFuture.channel().close();
+
+ if (numRetries > retryCount.getAndIncrement()) {
+ final GenericFutureListener<ChannelFuture> currentListener = this;
+
+ RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() {
+ @Override
+ public void run() {
+ connectUsingNetty(address, currentListener);
+ }
+ }, PAUSE, TimeUnit.MILLISECONDS);
+
+ LOG.debug("Connecting to " + address + " has been failed. Retrying to connect.");
+ }
+ else {
+ latch.countDown();
+
+ LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
+ }
+ }
+ else {
+ latch.countDown();
+ }
+ }
+ }
+
+ public Channel getChannel() {
+ return channelFuture == null ? null : channelFuture.channel();
+ }
+
+ public boolean isConnected() {
+ Channel channel = getChannel();
+ return channel != null && channel.isOpen() && channel.isActive();
+ }
+
+ public SocketAddress getRemoteAddress() {
+ Channel channel = getChannel();
+ return channel == null ? null : channel.remoteAddress();
+ }
+
+ @Override
+ public void close() {
+ Channel channel = getChannel();
+ if (channel != null && channel.isOpen()) {
+ LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+ channel.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
new file mode 100644
index 0000000..b7f4537
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+public class NettyRpcController implements RpcController {
+ private String errorText;
+
+ @Override
+ public void reset() {
+ errorText = null;
+ }
+
+ @Override
+ public boolean failed() {
+ return errorText != null;
+ }
+
+ @Override
+ public String errorText() {
+ return errorText;
+ }
+
+ @Override
+ public void startCancel() {
+ // TODO - to be implemented
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFailed(String s) {
+ errorText = s;
+ }
+
+ @Override
+ public boolean isCanceled() {
+ // TODO - to be implemented
+ return false;
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
new file mode 100644
index 0000000..9b7f8ac
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+
+public class NullCallback implements RpcCallback<Object> {
+ private final static NullCallback instance;
+
+ static {
+ instance = new NullCallback();
+ }
+
+ public static RpcCallback get() {
+ return instance;
+ }
+
+ @Override
+ public void run(Object parameter) {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
new file mode 100644
index 0000000..6a340dc
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+
+import com.google.protobuf.MessageLite;
+
+class ProtoChannelInitializer extends ChannelInitializer<Channel> {
+ private final MessageLite defaultInstance;
+ private final ChannelHandler handler;
+
+ public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
+ this.handler = handler;
+ this.defaultInstance = defaultInstance;
+ }
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
+ pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
+ pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
+ pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+ pipeline.addLast("handler", handler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
new file mode 100644
index 0000000..52ef31a
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+
+public class RemoteCallException extends RemoteException {
+ private int seqId;
+ private String originExceptionClass;
+
+ public RemoteCallException(int seqId, MethodDescriptor methodDesc,
+ Throwable t) {
+ super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t);
+ this.seqId = seqId;
+ if (t != null) {
+ originExceptionClass = t.getClass().getCanonicalName();
+ }
+ }
+
+ public RemoteCallException(int seqId, Throwable t) {
+ super(t);
+ this.seqId = seqId;
+ if (t != null) {
+ originExceptionClass = t.getClass().getCanonicalName();
+ }
+ }
+
+ public RpcResponse getResponse() {
+ RpcResponse.Builder builder = RpcResponse.newBuilder();
+ builder.setId(seqId);
+ if (getCause().getMessage() == null) {
+ builder.setErrorMessage(getCause().getClass().getName());
+ } else {
+ builder.setErrorMessage(getCause().getMessage());
+ }
+ builder.setErrorTrace(getStackTraceString(getCause()));
+ builder.setErrorClass(originExceptionClass);
+
+ return builder.build();
+ }
+
+ private static String getStackTraceString(Throwable aThrowable) {
+ final Writer result = new StringWriter();
+ final PrintWriter printWriter = new PrintWriter(result);
+ aThrowable.printStackTrace(printWriter);
+ return result.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java
new file mode 100644
index 0000000..30c110d
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+public class RemoteException extends RuntimeException {
+ public RemoteException() {
+ super();
+ }
+
+ public RemoteException(String message) {
+ super(message);
+ }
+
+ public RemoteException(Throwable t) {
+ super(t);
+ }
+
+ public RemoteException(String message, Throwable t) {
+ super(message, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
new file mode 100644
index 0000000..3c054ad
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+public class RetriesExhaustedException extends RuntimeException {
+ private static final long serialVersionUID = 1876775844L;
+
+ public RetriesExhaustedException(final String msg) {
+ super(msg);
+ }
+
+ public RetriesExhaustedException(final String msg, final IOException e) {
+ super(msg, e);
+ }
+
+ /**
+ * Datastructure that allows adding more info around Throwable incident.
+ */
+ public static class ThrowableWithExtraContext {
+ private final Throwable t;
+ private final long when;
+ private final String extras;
+
+ public ThrowableWithExtraContext(final Throwable t, final long when,
+ final String extras) {
+ this.t = t;
+ this.when = when;
+ this.extras = extras;
+ }
+
+ @Override
+ public String toString() {
+ return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
+ }
+ }
+
+ /**
+ * Create a new RetriesExhaustedException from the list of prior failures.
+ * @param callableVitals Details from the {@link ServerCallable} we were using
+ * when we got this exception.
+ * @param numTries The number of tries we made
+ * @param exceptions List of exceptions that failed before giving up
+ */
+ public RetriesExhaustedException(final String callableVitals, int numTries,
+ List<Throwable> exceptions) {
+ super(getMessage(callableVitals, numTries, exceptions));
+ }
+
+ /**
+ * Create a new RetriesExhaustedException from the list of prior failures.
+ * @param numTries
+ * @param exceptions List of exceptions that failed before giving up
+ */
+ public RetriesExhaustedException(final int numTries,
+ final List<Throwable> exceptions) {
+ super(getMessage(numTries, exceptions));
+ }
+
+ private static String getMessage(String callableVitals, int numTries,
+ List<Throwable> exceptions) {
+ StringBuilder buffer = new StringBuilder("Failed contacting ");
+ buffer.append(callableVitals);
+ buffer.append(" after ");
+ buffer.append(numTries + 1);
+ buffer.append(" attempts.\nExceptions:\n");
+ for (Throwable t : exceptions) {
+ buffer.append(t.toString());
+ buffer.append("\n");
+ }
+ return buffer.toString();
+ }
+
+ private static String getMessage(final int numTries,
+ final List<Throwable> exceptions) {
+ StringBuilder buffer = new StringBuilder("Failed after attempts=");
+ buffer.append(numTries + 1);
+ buffer.append(", exceptions:\n");
+ for (Throwable t : exceptions) {
+ buffer.append(t.toString());
+ buffer.append("\n");
+ }
+ return buffer.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
new file mode 100644
index 0000000..6d1f479
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.util.internal.logging.CommonsLoggerFactory;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RpcConnectionPool {
+ private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
+
+ private Map<RpcConnectionKey, NettyClientBase> connections =
+ new HashMap<RpcConnectionKey, NettyClientBase>();
+
+ private static RpcConnectionPool instance;
+ private final Object lockObject = new Object();
+
+ public final static int RPC_RETRIES = 3;
+
+ private RpcConnectionPool() {
+ }
+
+ public synchronized static RpcConnectionPool getPool() {
+ if(instance == null) {
+ InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
+ instance = new RpcConnectionPool();
+ }
+ return instance;
+ }
+
+ private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+ NettyClientBase client;
+ if(rpcConnectionKey.asyncMode) {
+ client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES);
+ } else {
+ client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES);
+ }
+ return client;
+ }
+
+ public static final long DEFAULT_TIMEOUT = 3000;
+ public static final long DEFAULT_INTERVAL = 500;
+
+ public NettyClientBase getConnection(InetSocketAddress addr,
+ Class<?> protocolClass, boolean asyncMode)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+ return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL);
+ }
+
+ public NettyClientBase getConnection(InetSocketAddress addr,
+ Class<?> protocolClass, boolean asyncMode, long timeout, long interval)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+ RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
+
+ RpcUtils.Timer timer = new RpcUtils.Timer(timeout);
+ for (; !timer.isTimedOut(); timer.elapsed()) {
+ NettyClientBase client;
+ synchronized (lockObject) {
+ client = connections.get(key);
+ if (client == null) {
+ connections.put(key, client = makeConnection(key));
+ }
+ }
+ if (client.acquire(timer.remaining())) {
+ return client;
+ }
+ timer.interval(interval);
+ }
+
+ throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec");
+ }
+
+ public void releaseConnection(NettyClientBase client) {
+ release(client, false);
+ }
+
+ public void closeConnection(NettyClientBase client) {
+ release(client, true);
+ }
+
+ private void release(NettyClientBase client, boolean close) {
+ if (client == null) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Close connection [" + client.getKey() + "]");
+ }
+ try {
+ if (returnToPool(client, close)) {
+ client.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Current Connections [" + connections.size() + "]");
+ }
+ } catch (Exception e) {
+ LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
+ }
+ }
+
+ // return true if the connection should be closed
+ private boolean returnToPool(NettyClientBase client, boolean close) {
+ synchronized (lockObject) {
+ if (client.release() && (close || !client.isConnected())) {
+ connections.remove(client.getKey());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void close() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Pool Closed");
+ }
+
+ synchronized (lockObject) {
+ for (NettyClientBase eachClient : connections.values()) {
+ try {
+ eachClient.close();
+ } catch (Exception e) {
+ LOG.error("close client pool error", e);
+ }
+ }
+ connections.clear();
+ }
+ }
+
+ public void shutdown(){
+ close();
+ RpcChannelFactory.shutdownGracefully();
+ }
+
+ static class RpcConnectionKey {
+ final InetSocketAddress addr;
+ final Class<?> protocolClass;
+ final boolean asyncMode;
+
+ final String description;
+
+ public RpcConnectionKey(InetSocketAddress addr,
+ Class<?> protocolClass, boolean asyncMode) {
+ this.addr = addr;
+ this.protocolClass = protocolClass;
+ this.asyncMode = asyncMode;
+ this.description = "["+ protocolClass + "] " + addr + "," + asyncMode;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(!(obj instanceof RpcConnectionKey)) {
+ return false;
+ }
+
+ return toString().equals(obj.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ return description.hashCode();
+ }
+ }
+}