You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/18 15:57:58 UTC
[4/5] tajo git commit: TAJO-1337: Implements common modules to handle
RESTful API
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
deleted file mode 100644
index ed6b634..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class RpcChannelFactory {
- private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
-
- private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
-
- private static final Object lockObjectForLoopGroup = new Object();
- private static AtomicInteger serverCount = new AtomicInteger(0);
-
- public enum ClientChannelId {
- CLIENT_DEFAULT,
- FETCHER
- }
-
- private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
- new ConcurrentHashMap<ClientChannelId, Integer>();
- private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
- new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
-
- private RpcChannelFactory(){
- }
-
- static {
- Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
-
- defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
- defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
- }
-
- /**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
- */
- public static EventLoopGroup getSharedClientEventloopGroup() {
- return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
- }
-
- /**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
- *
- * @param workerNum The number of workers
- */
- public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
- //shared woker and boss pool
- return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
- }
-
- /**
- * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
- *
- * @param clientId
- * @param workerNum
- * @return
- */
- public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
- Queue<EventLoopGroup> eventLoopGroupQueue;
- EventLoopGroup returnEventLoopGroup;
-
- synchronized (lockObjectForLoopGroup) {
- eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
- if (eventLoopGroupQueue == null) {
- eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
- }
-
- returnEventLoopGroup = eventLoopGroupQueue.poll();
- if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
- returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
- }
- eventLoopGroupQueue.add(returnEventLoopGroup);
- }
-
- return returnEventLoopGroup;
- }
-
- protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
- return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
- }
-
- // Client must release the external resources
- protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
- int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
- Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
- eventLoopGroupPool.put(clientId, loopGroupQueue);
-
- for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
- loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
- }
-
- return loopGroupQueue;
- }
-
- protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
- }
-
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
-
- return new NioEventLoopGroup(workerNum, clientFactory);
- }
-
- // Client must release the external resources
- public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
- name = name + "-" + serverCount.incrementAndGet();
- if(LOG.isInfoEnabled()){
- LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
- }
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
- ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
-
- EventLoopGroup bossGroup =
- new NioEventLoopGroup(1, bossFactory);
- EventLoopGroup workerGroup =
- new NioEventLoopGroup(workerNum, workerFactory);
-
- return new ServerBootstrap().group(bossGroup, workerGroup);
- }
-
- public static void shutdownGracefully(){
- if(LOG.isDebugEnabled()) {
- LOG.debug("Shutdown Shared RPC Pool");
- }
-
- synchronized(lockObjectForLoopGroup) {
- for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
- for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
- eventLoopGroup.shutdownGracefully();
- }
-
- eventLoopGroupQueue.clear();
- }
- eventLoopGroupPool.clear();
- }
- }
-
- static class CleanUpHandler extends Thread {
-
- @Override
- public void run() {
- RpcChannelFactory.shutdownGracefully();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
deleted file mode 100644
index 6d1f479..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import io.netty.channel.ConnectTimeoutException;
-import io.netty.util.internal.logging.CommonsLoggerFactory;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-public class RpcConnectionPool {
- private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
-
- private Map<RpcConnectionKey, NettyClientBase> connections =
- new HashMap<RpcConnectionKey, NettyClientBase>();
-
- private static RpcConnectionPool instance;
- private final Object lockObject = new Object();
-
- public final static int RPC_RETRIES = 3;
-
- private RpcConnectionPool() {
- }
-
- public synchronized static RpcConnectionPool getPool() {
- if(instance == null) {
- InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
- instance = new RpcConnectionPool();
- }
- return instance;
- }
-
- private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
- throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
- NettyClientBase client;
- if(rpcConnectionKey.asyncMode) {
- client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES);
- } else {
- client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES);
- }
- return client;
- }
-
- public static final long DEFAULT_TIMEOUT = 3000;
- public static final long DEFAULT_INTERVAL = 500;
-
- public NettyClientBase getConnection(InetSocketAddress addr,
- Class<?> protocolClass, boolean asyncMode)
- throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
- return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL);
- }
-
- public NettyClientBase getConnection(InetSocketAddress addr,
- Class<?> protocolClass, boolean asyncMode, long timeout, long interval)
- throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
- RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
-
- RpcUtils.Timer timer = new RpcUtils.Timer(timeout);
- for (; !timer.isTimedOut(); timer.elapsed()) {
- NettyClientBase client;
- synchronized (lockObject) {
- client = connections.get(key);
- if (client == null) {
- connections.put(key, client = makeConnection(key));
- }
- }
- if (client.acquire(timer.remaining())) {
- return client;
- }
- timer.interval(interval);
- }
-
- throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec");
- }
-
- public void releaseConnection(NettyClientBase client) {
- release(client, false);
- }
-
- public void closeConnection(NettyClientBase client) {
- release(client, true);
- }
-
- private void release(NettyClientBase client, boolean close) {
- if (client == null) {
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Close connection [" + client.getKey() + "]");
- }
- try {
- if (returnToPool(client, close)) {
- client.close();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Current Connections [" + connections.size() + "]");
- }
- } catch (Exception e) {
- LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
- }
- }
-
- // return true if the connection should be closed
- private boolean returnToPool(NettyClientBase client, boolean close) {
- synchronized (lockObject) {
- if (client.release() && (close || !client.isConnected())) {
- connections.remove(client.getKey());
- return true;
- }
- }
- return false;
- }
-
- public void close() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Pool Closed");
- }
-
- synchronized (lockObject) {
- for (NettyClientBase eachClient : connections.values()) {
- try {
- eachClient.close();
- } catch (Exception e) {
- LOG.error("close client pool error", e);
- }
- }
- connections.clear();
- }
- }
-
- public void shutdown(){
- close();
- RpcChannelFactory.shutdownGracefully();
- }
-
- static class RpcConnectionKey {
- final InetSocketAddress addr;
- final Class<?> protocolClass;
- final boolean asyncMode;
-
- final String description;
-
- public RpcConnectionKey(InetSocketAddress addr,
- Class<?> protocolClass, boolean asyncMode) {
- this.addr = addr;
- this.protocolClass = protocolClass;
- this.asyncMode = asyncMode;
- this.description = "["+ protocolClass + "] " + addr + "," + asyncMode;
- }
-
- @Override
- public String toString() {
- return description;
- }
-
- @Override
- public boolean equals(Object obj) {
- if(!(obj instanceof RpcConnectionKey)) {
- return false;
- }
-
- return toString().equals(obj.toString());
- }
-
- @Override
- public int hashCode() {
- return description.hashCode();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
deleted file mode 100644
index 152d426..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class RpcUtils {
-
- public static String normalizeInetSocketAddress(InetSocketAddress addr) {
- return addr.getAddress().getHostAddress() + ":" + addr.getPort();
- }
-
- /**
- * Util method to build socket addr from either:
- * <host>
- * <host>:<port>
- * <fs>://<host>:<port>/<path>
- */
- public static InetSocketAddress createSocketAddr(String host, int port) {
- return new InetSocketAddress(host, port);
- }
-
- /**
- * Returns InetSocketAddress that a client can use to
- * connect to the server. NettyServerBase.getListenerAddress() is not correct when
- * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
- * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
- *
- * @param addr of a listener
- * @return socket address that a client can use to connect to the server.
- */
- public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
- if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
- try {
- addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
- } catch (UnknownHostException uhe) {
- // shouldn't get here unless the host doesn't have a loopback iface
- addr = new InetSocketAddress("127.0.0.1", addr.getPort());
- }
- }
- InetSocketAddress canonicalAddress =
- new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort());
- return canonicalAddress;
- }
-
- public static InetSocketAddress createUnresolved(String addr) {
- String [] splitted = addr.split(":");
- return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
- }
-
- public static class Timer {
- private long remaining;
- private long prev;
- public Timer(long timeout) {
- this.remaining = timeout;
- this.prev = System.currentTimeMillis();
- }
-
- public boolean isTimedOut() {
- return remaining <= 0;
- }
-
- public void elapsed() {
- long current = System.currentTimeMillis();
- remaining -= (prev - current);
- prev = current;
- }
-
- public void interval(long wait) {
- if (wait <= 0 || isTimedOut()) {
- return;
- }
- try {
- Thread.sleep(Math.min(remaining, wait));
- } catch (Exception ex) {
- // ignore
- }
- }
-
- public long remaining() {
- return remaining;
- }
- }
-
- public static class Scrutineer<T> {
-
- private final AtomicReference<T> reference = new AtomicReference<T>();
-
- T check(T ticket) {
- T granted = reference.get();
- for (;granted == null; granted = reference.get()) {
- if (reference.compareAndSet(null, ticket)) {
- return ticket;
- }
- }
- return granted;
- }
-
- boolean clear(T granted) {
- return reference.compareAndSet(granted, null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
deleted file mode 100644
index fb1cec2..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.protobuf.ServiceException;
-
-public abstract class ServerCallable<T> {
- protected InetSocketAddress addr;
- protected long startTime;
- protected long endTime;
- protected Class<?> protocol;
- protected boolean asyncMode;
- protected boolean closeConn;
- protected RpcConnectionPool connPool;
-
- public abstract T call(NettyClientBase client) throws Exception;
-
- public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) {
- this(connPool, addr, protocol, asyncMode, false);
- }
-
- public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol,
- boolean asyncMode, boolean closeConn) {
- this.connPool = connPool;
- this.addr = addr;
- this.protocol = protocol;
- this.asyncMode = asyncMode;
- this.closeConn = closeConn;
- }
-
- public void beforeCall() {
- this.startTime = System.currentTimeMillis();
- }
-
- public long getStartTime(){
- return startTime;
- }
-
- public void afterCall() {
- this.endTime = System.currentTimeMillis();
- }
-
- public long getEndTime(){
- return endTime;
- }
-
- boolean abort = false;
- public void abort() {
- abort = true;
- }
- /**
- * Run this instance with retries, timed waits,
- * and refinds of missing regions.
- *
- * @param <T> the type of the return value
- * @return an object of type T
- * @throws com.google.protobuf.ServiceException if a remote or network exception occurs
- */
- public T withRetries() throws ServiceException {
- //TODO configurable
- final long pause = 500; //ms
- final int numRetries = 3;
- List<Throwable> exceptions = new ArrayList<Throwable>();
-
- for (int tries = 0; tries < numRetries; tries++) {
- NettyClientBase client = null;
- try {
- beforeCall();
- if(addr != null) {
- client = connPool.getConnection(addr, protocol, asyncMode);
- }
- return call(client);
- } catch (IOException ioe) {
- exceptions.add(ioe);
- if(abort) {
- throw new ServiceException(ioe.getMessage(), ioe);
- }
- if (tries == numRetries - 1) {
- throw new ServiceException("Giving up after tries=" + tries, ioe);
- }
- } catch (Throwable t) {
- throw new ServiceException(t);
- } finally {
- afterCall();
- if(closeConn) {
- connPool.closeConnection(client);
- } else {
- connPool.releaseConnection(client);
- }
- }
- try {
- Thread.sleep(pause * (tries + 1));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ServiceException("Giving up after tries=" + tries, e);
- }
- }
- return null;
- }
-
- /**
- * Run this instance against the server once.
- * @param <T> the type of the return value
- * @return an object of type T
- * @throws java.io.IOException if a remote or network exception occurs
- * @throws RuntimeException other unspecified error
- */
- public T withoutRetries() throws IOException, RuntimeException {
- NettyClientBase client = null;
- try {
- beforeCall();
- client = connPool.getConnection(addr, protocol, asyncMode);
- return call(client);
- } catch (Throwable t) {
- Throwable t2 = translateException(t);
- if (t2 instanceof IOException) {
- throw (IOException)t2;
- } else {
- throw new RuntimeException(t2);
- }
- } finally {
- afterCall();
- if(closeConn) {
- connPool.closeConnection(client);
- } else {
- connPool.releaseConnection(client);
- }
- }
- }
-
- private static Throwable translateException(Throwable t) throws IOException {
- if (t instanceof UndeclaredThrowableException) {
- t = t.getCause();
- }
- if (t instanceof RemoteException && t.getCause() != null) {
- t = t.getCause();
- }
- return t;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
deleted file mode 100644
index 113d181..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ServiceException;
-import org.apache.commons.lang.exception.ExceptionUtils;
-
-public class TajoServiceException extends ServiceException {
- private String traceMessage;
- private String protocol;
- private String remoteAddress;
-
- public TajoServiceException(String message) {
- super(message);
- }
- public TajoServiceException(String message, String traceMessage) {
- super(message);
- this.traceMessage = traceMessage;
- }
-
- public TajoServiceException(String message, Throwable cause, String protocol, String remoteAddress) {
- super(message, cause);
-
- this.protocol = protocol;
- this.remoteAddress = remoteAddress;
- }
-
- public String getTraceMessage() {
- if(traceMessage == null && getCause() != null){
- this.traceMessage = ExceptionUtils.getStackTrace(getCause());
- }
- return traceMessage;
- }
-
- public String getProtocol() {
- return protocol;
- }
-
- public String getRemoteAddress() {
- return remoteAddress;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/DummyProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/DummyProtos.proto b/tajo-rpc/src/main/proto/DummyProtos.proto
deleted file mode 100644
index f53f0d6..0000000
--- a/tajo-rpc/src/main/proto/DummyProtos.proto
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc.test";
-option java_outer_classname = "DummyProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-message MulRequest1 {
- required int32 x1 = 1;
- required int32 x2 = 2;
-}
-
-message MulRequest2 {
- required int32 x1 = 1;
- required int32 x2 = 2;
-}
-
-message MulResponse {
- required int32 result1 = 1;
- required int32 result2 = 2;
-}
-
-message InnerNode {
- required string instr = 1;
-}
-
-message InnerRequest {
- repeated InnerNode nodes = 1;
-}
-
-message InnerResponse {
- repeated InnerNode nodes = 1;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/RpcProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/RpcProtos.proto b/tajo-rpc/src/main/proto/RpcProtos.proto
deleted file mode 100644
index 69f43ed..0000000
--- a/tajo-rpc/src/main/proto/RpcProtos.proto
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc";
-option java_outer_classname = "RpcProtos";
-
-message RpcRequest {
- required int32 id = 1;
- required string method_name = 2;
- optional bytes request_message = 3;
-}
-
-message RpcResponse {
- required int32 id = 1;
- optional bytes response_message = 2;
- optional string error_class = 3;
- optional string error_message = 4;
- optional string error_trace = 5;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/TestProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/TestProtocol.proto b/tajo-rpc/src/main/proto/TestProtocol.proto
deleted file mode 100644
index 58640ea..0000000
--- a/tajo-rpc/src/main/proto/TestProtocol.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc.test";
-option java_outer_classname = "DummyProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "TestProtos.proto";
-
-service DummyProtocolService {
- rpc sum (SumRequest) returns (SumResponse);
- rpc echo (EchoMessage) returns (EchoMessage);
- rpc getError (EchoMessage) returns (EchoMessage);
- rpc getNull (EchoMessage) returns (EchoMessage);
- rpc deley (EchoMessage) returns (EchoMessage);
- rpc throwException (EchoMessage) returns (EchoMessage);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/TestProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/TestProtos.proto b/tajo-rpc/src/main/proto/TestProtos.proto
deleted file mode 100644
index 5001c0e..0000000
--- a/tajo-rpc/src/main/proto/TestProtos.proto
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2012 Database Lab., Korea Univ.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.rpc.test";
-option java_outer_classname = "TestProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-message EchoMessage {
- required string message = 1;
-}
-
-message SumRequest {
- required int32 x1 = 1;
- required int64 x2 = 2;
- required double x3 = 3;
- required float x4 = 4;
-}
-
-message SumResponse {
- required double result = 1;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/log4j.properties b/tajo-rpc/src/test/java/log4j.properties
deleted file mode 100644
index 2c4d991..0000000
--- a/tajo-rpc/src/test/java/log4j.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-##
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=info,stdout
-log4j.threshhold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p: %c (%M(%L)) - %m%n
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
deleted file mode 100644
index a974a65..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.junit.AfterClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-
-public class TestAsyncRpc {
- private static Log LOG = LogFactory.getLog(TestAsyncRpc.class);
- private static String MESSAGE = "TestAsyncRpc";
-
- double sum;
- String echo;
-
- AsyncRpcServer server;
- AsyncRpcClient client;
- Interface stub;
- DummyProtocolAsyncImpl service;
- int retries;
-
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- @interface SetupRpcConnection {
- boolean setupRpcServer() default true;
- boolean setupRpcClient() default true;
- }
-
- @Rule
- public ExternalResource resource = new ExternalResource() {
-
- private Description description;
-
- @Override
- public Statement apply(Statement base, Description description) {
- this.description = description;
- return super.apply(base, description);
- }
-
- @Override
- protected void before() throws Throwable {
- SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
- setUpRpcServer();
- }
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
- setUpRpcClient();
- }
- }
-
- @Override
- protected void after() {
- SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
- try {
- tearDownRpcClient();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
- try {
- tearDownRpcServer();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
- }
-
- };
-
- public void setUpRpcServer() throws Exception {
- service = new DummyProtocolAsyncImpl();
- server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), 2);
- server.start();
- }
-
- public void setUpRpcClient() throws Exception {
- retries = 1;
-
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(
- RpcUtils.getConnectAddress(server.getListenAddress()),
- DummyProtocol.class, true);
- client = new AsyncRpcClient(rpcConnectionKey, retries);
- client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT);
- stub = client.getStub();
- }
-
- @AfterClass
- public static void tearDownClass() throws Exception {
- RpcChannelFactory.shutdownGracefully();
- }
-
- public void tearDownRpcServer() throws Exception {
- if(server != null) {
- server.shutdown();
- server = null;
- }
- }
-
- public void tearDownRpcClient() throws Exception {
- if(client != null) {
- client.close();
- client = null;
- }
- }
-
- boolean calledMarker = false;
-
- @Test
- public void testRpc() throws Exception {
-
- SumRequest sumRequest = SumRequest.newBuilder()
- .setX1(1)
- .setX2(2)
- .setX3(3.15d)
- .setX4(2.0f).build();
-
- stub.sum(null, sumRequest, new RpcCallback<SumResponse>() {
- @Override
- public void run(SumResponse parameter) {
- sum = parameter.getResult();
- assertTrue(8.15d == sum);
- }
- });
-
-
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
- @Override
- public void run(EchoMessage parameter) {
- echo = parameter.getMessage();
- assertEquals(MESSAGE, echo);
- calledMarker = true;
- }
- };
- stub.echo(null, echoMessage, callback);
- Thread.sleep(1000);
- assertTrue(calledMarker);
- }
-
- private CountDownLatch testNullLatch;
-
- @Test
- public void testGetNull() throws Exception {
- testNullLatch = new CountDownLatch(1);
- stub.getNull(null, null, new RpcCallback<EchoMessage>() {
- @Override
- public void run(EchoMessage parameter) {
- assertNull(parameter);
- LOG.info("testGetNull retrieved");
- testNullLatch.countDown();
- }
- });
- assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS));
- assertTrue(service.getNullCalled);
- }
-
- @Test
- public void testCallFuture() throws Exception {
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
- stub.deley(null, echoMessage, future);
-
- assertFalse(future.isDone());
- assertEquals(future.get(), echoMessage);
- assertTrue(future.isDone());
- }
-
- @Test
- public void testCallFutureTimeout() throws Exception {
- boolean timeout = false;
- try {
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
- stub.deley(null, echoMessage, future);
-
- assertFalse(future.isDone());
- future.get(1, TimeUnit.SECONDS);
- } catch (TimeoutException te) {
- timeout = true;
- }
- assertTrue(timeout);
- }
-
- @Test
- public void testCallFutureDisconnected() throws Exception {
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-
- tearDownRpcServer();
-
- stub.echo(future.getController(), echoMessage, future);
- EchoMessage response = future.get();
-
- assertNull(response);
- assertTrue(future.getController().failed());
- assertTrue(future.getController().errorText() != null);
- }
-
- @Test
- public void testStubDisconnected() throws Exception {
-
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-
- if (server != null) {
- server.shutdown(true);
- server = null;
- }
-
- stub = client.getStub();
- stub.echo(future.getController(), echoMessage, future);
- EchoMessage response = future.get();
-
- assertNull(response);
- assertTrue(future.getController().failed());
- assertTrue(future.getController().errorText() != null);
- }
-
- @Test
- @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
- public void testConnectionRetry() throws Exception {
- retries = 10;
- ServerSocket serverSocket = new ServerSocket(0);
- final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
- serverSocket.close();
- service = new DummyProtocolAsyncImpl();
-
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-
- //lazy startup
- Thread serverThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- server = new AsyncRpcServer(DummyProtocol.class,
- service, address, 2);
- } catch (Exception e) {
- fail(e.getMessage());
- }
- server.start();
- }
- });
- serverThread.start();
-
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
- client = new AsyncRpcClient(rpcConnectionKey, retries);
- assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
- stub = client.getStub();
- stub.echo(future.getController(), echoMessage, future);
-
- assertFalse(future.isDone());
- assertEquals(echoMessage, future.get());
- assertTrue(future.isDone());
- }
-
- @Test
- public void testConnectionFailure() throws Exception {
- InetSocketAddress address = new InetSocketAddress("test", 0);
- try {
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
- NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
- assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
- } catch (Throwable throwable) {
- fail();
- }
- }
-
- @Test
- @SetupRpcConnection(setupRpcClient=false)
- public void testUnresolvedAddress() throws Exception {
- String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(
- RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
- client = new AsyncRpcClient(rpcConnectionKey, retries);
- assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
- Interface stub = client.getStub();
- EchoMessage echoMessage = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
- stub.deley(null, echoMessage, future);
-
- assertFalse(future.isDone());
- assertEquals(future.get(), echoMessage);
- assertTrue(future.isDone());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
deleted file mode 100644
index 10dd766..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.tajo.rpc.test.DummyProtocol;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.junit.AfterClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-public class TestBlockingRpc {
- public static final String MESSAGE = "TestBlockingRpc";
-
- private BlockingRpcServer server;
- private BlockingRpcClient client;
- private BlockingInterface stub;
- private DummyProtocolBlockingImpl service;
- private int retries;
-
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- @interface SetupRpcConnection {
- boolean setupRpcServer() default true;
- boolean setupRpcClient() default true;
- }
-
- @Rule
- public ExternalResource resource = new ExternalResource() {
-
- private Description description;
-
- @Override
- public Statement apply(Statement base, Description description) {
- this.description = description;
- return super.apply(base, description);
- }
-
- @Override
- protected void before() throws Throwable {
- SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
- setUpRpcServer();
- }
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
- setUpRpcClient();
- }
- }
-
- @Override
- protected void after() {
- SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
- try {
- tearDownRpcClient();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
-
- if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
- try {
- tearDownRpcServer();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
- }
-
- };
-
- public void setUpRpcServer() throws Exception {
- service = new DummyProtocolBlockingImpl();
- server = new BlockingRpcServer(DummyProtocol.class, service,
- new InetSocketAddress("127.0.0.1", 0), 2);
- server.start();
- }
-
- public void setUpRpcClient() throws Exception {
- retries = 1;
-
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(
- RpcUtils.getConnectAddress(server.getListenAddress()),
- DummyProtocol.class, false);
- client = new BlockingRpcClient(rpcConnectionKey, retries);
- assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
- stub = client.getStub();
- }
-
- @AfterClass
- public static void tearDownClass() throws Exception {
- RpcChannelFactory.shutdownGracefully();
- }
-
- public void tearDownRpcServer() throws Exception {
- if(server != null) {
- server.shutdown();
- server = null;
- }
- }
-
- public void tearDownRpcClient() throws Exception {
- if(client != null) {
- client.close();
- client = null;
- }
- }
-
- @Test
- public void testRpc() throws Exception {
- SumRequest request = SumRequest.newBuilder()
- .setX1(1)
- .setX2(2)
- .setX3(3.15d)
- .setX4(2.0f).build();
- SumResponse response1 = stub.sum(null, request);
- assertEquals(8.15d, response1.getResult(), 1e-15);
-
- EchoMessage message = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- EchoMessage response2 = stub.echo(null, message);
- assertEquals(MESSAGE, response2.getMessage());
- }
-
- @Test
- @SetupRpcConnection(setupRpcClient=false)
- public void testRpcWithServiceCallable() throws Exception {
- RpcConnectionPool pool = RpcConnectionPool.getPool();
- final SumRequest request = SumRequest.newBuilder()
- .setX1(1)
- .setX2(2)
- .setX3(3.15d)
- .setX4(2.0f).build();
-
- SumResponse response =
- new ServerCallable<SumResponse>(pool,
- server.getListenAddress(), DummyProtocol.class, false) {
- @Override
- public SumResponse call(NettyClientBase client) throws Exception {
- BlockingInterface stub2 = client.getStub();
- SumResponse response1 = stub2.sum(null, request);
- return response1;
- }
- }.withRetries();
-
- assertEquals(8.15d, response.getResult(), 1e-15);
-
- response =
- new ServerCallable<SumResponse>(pool,
- server.getListenAddress(), DummyProtocol.class, false) {
- @Override
- public SumResponse call(NettyClientBase client) throws Exception {
- BlockingInterface stub2 = client.getStub();
- SumResponse response1 = stub2.sum(null, request);
- return response1;
- }
- }.withoutRetries();
-
- assertTrue(8.15d == response.getResult());
- pool.close();
- }
-
- @Test
- public void testThrowException() throws Exception {
- EchoMessage message = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
-
- try {
- stub.throwException(null, message);
- fail("RpcCall should throw exception");
- } catch (Throwable t) {
- assertTrue(t instanceof TajoServiceException);
- assertEquals("Exception Test", t.getMessage());
- TajoServiceException te = (TajoServiceException)t;
- assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol());
- assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(),
- te.getRemoteAddress());
- }
- }
-
- @Test
- @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
- public void testConnectionRetry() throws Exception {
- retries = 10;
- ServerSocket serverSocket = new ServerSocket(0);
- final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
- serverSocket.close();
-
- EchoMessage message = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
-
- //lazy startup
- Thread serverThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
- } catch (Exception e) {
- fail(e.getMessage());
- }
- server.start();
- }
- });
- serverThread.start();
-
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false);
- client = new BlockingRpcClient(rpcConnectionKey, retries);
- assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
- stub = client.getStub();
-
- EchoMessage response = stub.echo(null, message);
- assertEquals(MESSAGE, response.getMessage());
- }
-
- @Test
- public void testConnectionFailed() throws Exception {
- NettyClientBase client = null;
-
- try {
- int port = server.getListenAddress().getPort() + 1;
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(
- RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)),
- DummyProtocol.class, false);
- client = new BlockingRpcClient(rpcConnectionKey, retries);
- assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
- client.close();
- } catch (Throwable ce){
- if (client != null) {
- client.close();
- }
- fail();
- }
- }
-
- @Test
- public void testGetNull() throws Exception {
- assertNull(stub.getNull(null, null));
- assertTrue(service.getNullCalled);
- }
-
- @Test
- public void testShutdown() throws Exception {
- final StringBuilder error = new StringBuilder();
- Thread callThread = new Thread() {
- public void run() {
- try {
- EchoMessage message = EchoMessage.newBuilder()
- .setMessage(MESSAGE)
- .build();
- stub.deley(null, message);
- } catch (Exception e) {
- error.append(e.getMessage());
- }
- synchronized(error) {
- error.notifyAll();
- }
- }
- };
-
- callThread.start();
-
- final CountDownLatch latch = new CountDownLatch(1);
- Thread shutdownThread = new Thread() {
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- try {
- server.shutdown();
- server = null;
- latch.countDown();
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- };
- shutdownThread.start();
-
- assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS));
-
- assertTrue(latch.getCount() == 0);
-
- synchronized(error) {
- error.wait(5 * 1000);
- }
-
- if(!error.toString().isEmpty()) {
- fail(error.toString());
- }
- }
-
- @Test
- @SetupRpcConnection(setupRpcClient=false)
- public void testUnresolvedAddress() throws Exception {
- String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
- RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
- new RpcConnectionPool.RpcConnectionKey(
- RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
- client = new BlockingRpcClient(rpcConnectionKey, retries);
- assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
- BlockingInterface stub = client.getStub();
-
- EchoMessage message = EchoMessage.newBuilder()
- .setMessage(MESSAGE).build();
- EchoMessage response2 = stub.echo(null, message);
- assertEquals(MESSAGE, response2.getMessage());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
deleted file mode 100644
index 0ca7563..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.test.impl;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-
-public class DummyProtocolAsyncImpl implements Interface {
- private static final Log LOG =
- LogFactory.getLog(DummyProtocolAsyncImpl.class);
- public boolean getNullCalled = false;
- public boolean getErrorCalled = false;
-
- @Override
- public void sum(RpcController controller, SumRequest request,
- RpcCallback<SumResponse> done) {
-
- SumResponse response = SumResponse.newBuilder().setResult(
- request.getX1()+request.getX2()+request.getX3()+request.getX4()
- ).build();
- done.run(response);
- }
-
- @Override
- public void echo(RpcController controller, EchoMessage request,
- RpcCallback<EchoMessage> done) {
-
- done.run(request);
- }
-
- @Override
- public void getError(RpcController controller, EchoMessage request,
- RpcCallback<EchoMessage> done) {
- LOG.info("noCallback is called");
- getErrorCalled = true;
- controller.setFailed(request.getMessage());
- done.run(request);
- }
-
- @Override
- public void getNull(RpcController controller, EchoMessage request,
- RpcCallback<EchoMessage> done) {
- getNullCalled = true;
- LOG.info("noCallback is called");
- done.run(null);
- }
-
- @Override
- public void deley(RpcController controller, EchoMessage request,
- RpcCallback<EchoMessage> done) {
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- LOG.error(e.getMessage());
- }
-
- done.run(request);
- }
-
- public void throwException(RpcController controller, EchoMessage request,
- RpcCallback<EchoMessage> done) {
- done.run(request);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
deleted file mode 100644
index 8d4b597..0000000
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc.test.impl;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
-import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
-import org.apache.tajo.rpc.test.TestProtos.SumRequest;
-import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-
-public class DummyProtocolBlockingImpl implements BlockingInterface {
- private static final Log LOG =
- LogFactory.getLog(DummyProtocolBlockingImpl.class);
- public boolean getNullCalled = false;
- public boolean getErrorCalled = false;
-
- @Override
- public SumResponse sum(RpcController controller, SumRequest request)
- throws ServiceException {
- return SumResponse.newBuilder().setResult(
- request.getX1()+request.getX2()+request.getX3()+request.getX4()
- ).build();
- }
-
- @Override
- public EchoMessage echo(RpcController controller, EchoMessage request)
- throws ServiceException {
- return EchoMessage.newBuilder().
- setMessage(request.getMessage()).build();
- }
-
- @Override
- public EchoMessage getError(RpcController controller, EchoMessage request)
- throws ServiceException {
- getErrorCalled = true;
- controller.setFailed(request.getMessage());
- return request;
- }
-
- @Override
- public EchoMessage getNull(RpcController controller, EchoMessage request)
- throws ServiceException {
- getNullCalled = true;
- LOG.info("noCallback is called");
- return null;
- }
-
- @Override
- public EchoMessage deley(RpcController controller, EchoMessage request)
- throws ServiceException {
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- //throw new ServiceException(e.getMessage(), e);
- }
-
- return request;
- }
-
- public EchoMessage throwException(RpcController controller, EchoMessage request)
- throws ServiceException {
- throw new ServiceException("Exception Test");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/pom.xml b/tajo-rpc/tajo-rpc-common/pom.xml
new file mode 100644
index 0000000..2b1cd7a
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/pom.xml
@@ -0,0 +1,216 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
+ <groupId>org.apache.tajo</groupId>
+ <relativePath>../../tajo-project</relativePath>
+ </parent>
+ <packaging>jar</packaging>
+ <artifactId>tajo-rpc-common</artifactId>
+ <name>Tajo Rpc Common</name>
+ <description>Common Implementation for Netty</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ </configuration>
+ <executions>
+ <execution>
+ <id>create-jar</id>
+ <phase>prepare-package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <repositories>
+ <repository>
+ <id>repository.jboss.org</id>
+ <url>https://repository.jboss.org/nexus/content/repositories/releases/
+ </url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>dist</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>dist</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <echo file="${project.build.directory}/dist-layout-stitching.sh">
+ run() {
+ echo "\$ ${@}"
+ "${@}"
+ res=$?
+ if [ $res != 0 ]; then
+ echo
+ echo "Failed!"
+ echo
+ exit $res
+ fi
+ }
+
+ ROOT=`cd ${basedir}/..;pwd`
+ echo
+ echo "Current directory `pwd`"
+ echo
+ run rm -rf ${project.artifactId}-${project.version}
+ run mkdir ${project.artifactId}-${project.version}
+ run cd ${project.artifactId}-${project.version}
+ run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar .
+ echo
+ echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}"
+ echo
+ </echo>
+ <exec executable="sh" dir="${project.build.directory}" failonerror="true">
+ <arg line="./dist-layout-stitching.sh" />
+ </exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ </plugin>
+ </plugins>
+ </reporting>
+
+</project>
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
new file mode 100644
index 0000000..ad443d7
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Base class for netty implementation.
+ */
+public class NettyServerBase {
+ private static final Log LOG = LogFactory.getLog(NettyServerBase.class);
+ private static final String DEFAULT_PREFIX = "RpcServer_";
+ private static final AtomicInteger sequenceId = new AtomicInteger(0);
+
+ protected String serviceName;
+ protected InetSocketAddress serverAddr;
+ protected InetSocketAddress bindAddress;
+ protected ChannelInitializer<Channel> initializer;
+ protected ServerBootstrap bootstrap;
+ protected ChannelFuture channelFuture;
+ protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ private InetSocketAddress initIsa;
+ private Set<RpcEventListener> listeners = Collections.synchronizedSet(new HashSet<RpcEventListener>());
+
+ public NettyServerBase(InetSocketAddress address) {
+ this.initIsa = address;
+ }
+
+ public NettyServerBase(String serviceName, InetSocketAddress addr) {
+ this.serviceName = serviceName;
+ this.initIsa = addr;
+ }
+
+ public void setName(String name) {
+ this.serviceName = name;
+ }
+
+ public void init(ChannelInitializer<Channel> initializer, int workerNum) {
+ for (RpcEventListener listener: listeners) {
+ listener.onBeforeInit(this);
+ }
+
+ bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
+
+ this.initializer = initializer;
+ bootstrap
+ .channel(NioServerSocketChannel.class)
+ .childHandler(initializer)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+ .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10);
+
+ for (RpcEventListener listener: listeners) {
+ listener.onAfterInit(this);
+ }
+ }
+
+ public InetSocketAddress getListenAddress() {
+ return this.bindAddress;
+ }
+
+ public void start() {
+ for (RpcEventListener listener: listeners) {
+ listener.onBeforeStart(this);
+ }
+
+ if (serviceName == null) {
+ this.serviceName = getNextDefaultServiceName();
+ }
+
+ if (initIsa.getPort() == 0) {
+ try {
+ int port = getUnusedPort();
+ serverAddr = new InetSocketAddress(initIsa.getHostName(), port);
+ } catch (IOException e) {
+ LOG.error(e, e);
+ }
+ } else {
+ serverAddr = initIsa;
+ }
+
+ this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly();
+ this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress();
+
+ for (RpcEventListener listener: listeners) {
+ listener.onAfterStart(this);
+ }
+ LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress);
+ }
+
+ public Channel getChannel() {
+ return this.channelFuture.channel();
+ }
+
+ public void shutdown() {
+ shutdown(false);
+ }
+
+ public void shutdown(boolean waitUntilThreadsStop) {
+ for (RpcEventListener listener: listeners) {
+ listener.onBeforeShutdown(this);
+ }
+
+ try {
+ accepted.close();
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+
+ if(bootstrap != null) {
+ if (bootstrap.childGroup() != null) {
+ bootstrap.childGroup().shutdownGracefully();
+ if (waitUntilThreadsStop) {
+ bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+ }
+ }
+
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully();
+ if (waitUntilThreadsStop) {
+ bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+ }
+ }
+ }
+
+ for (RpcEventListener listener: listeners) {
+ listener.onAfterShutdown(this);
+ }
+
+ if (bindAddress != null) {
+ LOG.info("Rpc (" + serviceName + ") listened on "
+ + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+ }
+ }
+
+ private static String getNextDefaultServiceName() {
+ return DEFAULT_PREFIX + sequenceId.getAndIncrement();
+ }
+
+ private static final int startPortRange = 10000;
+ private static final int endPortRange = 50000;
+ private static final Random rnd = new Random(System.currentTimeMillis());
+ // each system has a different starting port number within the given range.
+ private static final AtomicInteger nextPortNum =
+ new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange));
+ private static final Object lockObject = new Object();
+
+
+ private synchronized static int getUnusedPort() throws IOException {
+ while (true) {
+ int port = nextPortNum.getAndIncrement();
+ if (port >= endPortRange) {
+ synchronized (lockObject) {
+ nextPortNum.set(startPortRange);
+ port = nextPortNum.getAndIncrement();
+ }
+ }
+ if (available(port)) {
+ return port;
+ }
+ }
+ }
+
+ private static boolean available(int port) throws IOException {
+ if (port < 1024 || port > 65535) {
+ throw new IllegalArgumentException("Port Number Out of Bound: " + port);
+ }
+
+ ServerSocket ss = null;
+ DatagramSocket ds = null;
+
+ try {
+ ss = new ServerSocket(port);
+ ss.setReuseAddress(true);
+
+ ds = new DatagramSocket(port);
+ ds.setReuseAddress(true);
+
+ return true;
+
+ } catch (IOException e) {
+ return false;
+ } finally {
+ if (ss != null) {
+ ss.close();
+ }
+
+ if (ds != null) {
+ ds.close();
+ }
+ }
+ }
+
+ public void addListener(RpcEventListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(RpcEventListener listener) {
+ listeners.remove(listener);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java
new file mode 100644
index 0000000..30c110d
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+public class RemoteException extends RuntimeException {
+ public RemoteException() {
+ super();
+ }
+
+ public RemoteException(String message) {
+ super(message);
+ }
+
+ public RemoteException(Throwable t) {
+ super(t);
+ }
+
+ public RemoteException(String message, Throwable t) {
+ super(message, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
new file mode 100644
index 0000000..3c054ad
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+public class RetriesExhaustedException extends RuntimeException {
+ private static final long serialVersionUID = 1876775844L;
+
+ public RetriesExhaustedException(final String msg) {
+ super(msg);
+ }
+
+ public RetriesExhaustedException(final String msg, final IOException e) {
+ super(msg, e);
+ }
+
+ /**
+ * Datastructure that allows adding more info around Throwable incident.
+ */
+ public static class ThrowableWithExtraContext {
+ private final Throwable t;
+ private final long when;
+ private final String extras;
+
+ public ThrowableWithExtraContext(final Throwable t, final long when,
+ final String extras) {
+ this.t = t;
+ this.when = when;
+ this.extras = extras;
+ }
+
+ @Override
+ public String toString() {
+ return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
+ }
+ }
+
+ /**
+ * Create a new RetriesExhaustedException from the list of prior failures.
+ * @param callableVitals Details from the {@link ServerCallable} we were using
+ * when we got this exception.
+ * @param numTries The number of tries we made
+ * @param exceptions List of exceptions that failed before giving up
+ */
+ public RetriesExhaustedException(final String callableVitals, int numTries,
+ List<Throwable> exceptions) {
+ super(getMessage(callableVitals, numTries, exceptions));
+ }
+
+ /**
+ * Create a new RetriesExhaustedException from the list of prior failures.
+ * @param numTries
+ * @param exceptions List of exceptions that failed before giving up
+ */
+ public RetriesExhaustedException(final int numTries,
+ final List<Throwable> exceptions) {
+ super(getMessage(numTries, exceptions));
+ }
+
+ private static String getMessage(String callableVitals, int numTries,
+ List<Throwable> exceptions) {
+ StringBuilder buffer = new StringBuilder("Failed contacting ");
+ buffer.append(callableVitals);
+ buffer.append(" after ");
+ buffer.append(numTries + 1);
+ buffer.append(" attempts.\nExceptions:\n");
+ for (Throwable t : exceptions) {
+ buffer.append(t.toString());
+ buffer.append("\n");
+ }
+ return buffer.toString();
+ }
+
+ private static String getMessage(final int numTries,
+ final List<Throwable> exceptions) {
+ StringBuilder buffer = new StringBuilder("Failed after attempts=");
+ buffer.append(numTries + 1);
+ buffer.append(", exceptions:\n");
+ for (Throwable t : exceptions) {
+ buffer.append(t.toString());
+ buffer.append("\n");
+ }
+ return buffer.toString();
+ }
+}
\ No newline at end of file