You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/09 10:18:25 UTC
tajo git commit: TAJO-2014: TestRpcClientManager fails occasionally.
Repository: tajo
Updated Branches:
refs/heads/master c1beaa71c -> b1959116a
TAJO-2014: TestRpcClientManager fails occasionally.
Closes #905
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b1959116
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b1959116
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b1959116
Branch: refs/heads/master
Commit: b1959116a7c129bc7de4156c366c81cebc14958a
Parents: c1beaa7
Author: Jinho Kim <jh...@apache.org>
Authored: Wed Dec 9 18:17:38 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed Dec 9 18:17:38 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/rpc/NettyServerBase.java | 13 ++--
.../org/apache/tajo/rpc/RpcClientManager.java | 10 +--
.../apache/tajo/rpc/TestRpcClientManager.java | 79 +++++++++-----------
.../apache/tajo/storage/TestFileTablespace.java | 24 +++---
.../tajo/storage/raw/TestDirectRawFile.java | 8 +-
6 files changed, 64 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a7a7966..6f20d8d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -57,6 +57,8 @@ Release 0.12.0 - unreleased
BUG FIXES
+ TAJO-2014: TestRpcClientManager fails occasionally. (jinho)
+
TAJO-2000: BSTIndex can cause OOM. (jinho)
TAJO-1992: \set timezone in cli doesn't work because of casesensitive (DaeMyung)
http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 08d877f..5d1ad26 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -140,22 +141,22 @@ public class NettyServerBase {
for (RpcEventListener listener: listeners) {
listener.onBeforeShutdown(this);
}
-
+
try {
accepted.close();
- if(bootstrap != null) {
+ if (bootstrap != null) {
if (bootstrap.childGroup() != null) {
- bootstrap.childGroup().shutdownGracefully();
+ Future future = bootstrap.childGroup().shutdownGracefully();
if (waitUntilThreadsStop) {
- bootstrap.childGroup().terminationFuture().sync();
+ future.sync();
}
}
if (bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully();
+ Future future = bootstrap.group().shutdownGracefully();
if (waitUntilThreadsStop) {
- bootstrap.childGroup().terminationFuture().sync();
+ future.sync();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index 032cf35..67c5936 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -18,10 +18,10 @@
package org.apache.tajo.rpc;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.CommonsLoggerFactory;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.apache.commons.logging.Log;
@@ -106,7 +106,7 @@ public class RpcClientManager {
public void channelRegistered(ChannelHandlerContext ctx) {
/* Register client to managed map */
clients.put(target.getKey(), target);
- target.getChannel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey()));
+ ctx.channel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey()));
}
@Override
@@ -210,7 +210,7 @@ public class RpcClientManager {
}
}
- static class ClientCloseFutureListener implements GenericFutureListener {
+ static class ClientCloseFutureListener implements ChannelFutureListener {
private RpcConnectionKey key;
public ClientCloseFutureListener(RpcConnectionKey key) {
@@ -218,7 +218,7 @@ public class RpcClientManager {
}
@Override
- public void operationComplete(Future future) throws Exception {
+ public void operationComplete(ChannelFuture future) throws Exception {
clients.remove(key);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
index 865e5dd..2b50c1f 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
@@ -20,6 +20,8 @@ package org.apache.tajo.rpc;
import org.apache.tajo.rpc.test.DummyProtocol;
import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.net.InetSocketAddress;
@@ -34,15 +36,27 @@ import static org.junit.Assert.*;
public class TestRpcClientManager {
+ static NettyServerBase server;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+ server = new AsyncRpcServer(DummyProtocol.class,
+ service, new InetSocketAddress("127.0.0.1", 0), 10);
+ server.start();
+
+ }
+
+ @AfterClass
+ public static void afterClass(){
+ server.shutdown(true);
+ }
+
@Test
public void testRaceCondition() throws Exception {
final int parallelCount = 50;
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), parallelCount);
- server.start();
try {
final InetSocketAddress address = server.getListenAddress();
final RpcClientManager manager = RpcClientManager.getInstance();
@@ -54,7 +68,7 @@ public class TestRpcClientManager {
public void run() {
NettyClientBase client = null;
try {
- client = manager.getClient(address, DummyProtocol.class, false, new Properties());
+ client = manager.getClient(address, DummyProtocol.class, true, new Properties());
} catch (Throwable e) {
fail(e.getMessage());
}
@@ -68,10 +82,9 @@ public class TestRpcClientManager {
future.get();
}
- NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false, new Properties());
+ NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, true, new Properties());
RpcClientManager.cleanup(clientBase);
} finally {
- server.shutdown(true);
executor.shutdown();
RpcClientManager.close();
}
@@ -79,10 +92,6 @@ public class TestRpcClientManager {
@Test
public void testClientCloseEvent() throws Exception {
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), 3);
- server.start();
RpcClientManager manager = RpcClientManager.getInstance();
try {
@@ -97,18 +106,13 @@ public class TestRpcClientManager {
client.close();
assertFalse(RpcClientManager.contains(key));
} finally {
- server.shutdown(true);
RpcClientManager.close();
}
}
@Test
public void testClientCloseEventWithReconnect() throws Exception {
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), 3);
- server.start();
- int repeat = 10;
+ int repeat = 100;
RpcClientManager manager = RpcClientManager.getInstance();
try {
@@ -133,46 +137,37 @@ public class TestRpcClientManager {
assertFalse(RpcClientManager.contains(key));
}
} finally {
- server.shutdown(true);
RpcClientManager.close();
}
}
@Test
public void testUnManagedClient() throws Exception {
- final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
- NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
- service, new InetSocketAddress("127.0.0.1", 0), 3);
- server.start();
RpcConnectionKey key =
new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
RpcClientManager.close();
RpcClientManager manager = RpcClientManager.getInstance();
- try {
- NettyClientBase client1 = manager.newClient(key, new Properties());
- assertTrue(client1.isConnected());
- assertFalse(RpcClientManager.contains(key));
+ NettyClientBase client1 = manager.newClient(key, new Properties());
+ assertTrue(client1.isConnected());
+ assertFalse(RpcClientManager.contains(key));
- NettyClientBase client2 = manager.newClient(key, new Properties());
- assertTrue(client2.isConnected());
- assertFalse(RpcClientManager.contains(key));
+ NettyClientBase client2 = manager.newClient(key, new Properties());
+ assertTrue(client2.isConnected());
+ assertFalse(RpcClientManager.contains(key));
- assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress());
- assertNotEquals(client1.getChannel(), client2.getChannel());
+ assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress());
+ assertNotEquals(client1.getChannel(), client2.getChannel());
- client1.close();
- assertFalse(client1.isConnected());
- assertTrue(client2.isConnected());
+ client1.close();
+ assertFalse(client1.isConnected());
+ assertTrue(client2.isConnected());
- client1.connect();
- client2.close();
- assertFalse(client2.isConnected());
- assertTrue(client1.isConnected());
+ client1.connect();
+ client2.close();
+ assertFalse(client2.isConnected());
+ assertTrue(client1.isConnected());
- RpcClientManager.cleanup(client1, client2);
- } finally {
- server.shutdown(true);
- }
+ RpcClientManager.cleanup(client1, client2);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index cd5bd32..d599a25 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -101,17 +101,17 @@ public class TestFileTablespace {
assertEquals(4,i);
}
- @Test
+ @Test(timeout = 60000)
public void testGetSplit() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
- cluster.waitClusterUp();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
@@ -158,16 +158,16 @@ public class TestFileTablespace {
}
}
- @Test
+ @Test(timeout = 60000)
public void testZeroLengthSplit() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
- cluster.waitClusterUp();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
@@ -207,17 +207,17 @@ public class TestFileTablespace {
}
}
- @Test
+ @Test(timeout = 60000)
public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
final Configuration conf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
- cluster.waitClusterUp();
TajoConf tajoConf = new TajoConf(conf);
tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
@@ -258,17 +258,17 @@ public class TestFileTablespace {
}
}
- @Test
+ @Test(timeout = 60000)
public void testGetFileTablespace() throws Exception {
final Configuration hdfsConf = new HdfsConfiguration();
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ hdfsConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
final MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(hdfsConf).numDataNodes(2).build();
- cluster.waitClusterUp();
+ new MiniDFSCluster.Builder(hdfsConf).numDataNodes(1).build();
URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo");
Optional<Tablespace> existingTs = Optional.empty();
@@ -287,13 +287,7 @@ public class TestFileTablespace {
space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace");
assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
-
} finally {
-
- if (existingTs.isPresent()) {
- TablespaceManager.addTableSpaceForTest(existingTs.get());
- }
-
cluster.shutdown(true);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
index bae81a9..179c9eb 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
@@ -100,6 +100,7 @@ public class TestDirectRawFile {
String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
@@ -110,7 +111,6 @@ public class TestDirectRawFile {
builder.waitSafeMode(true);
cluster = builder.build();
- cluster.waitClusterUp();
dfs = cluster.getFileSystem();
localFs = FileSystem.getLocal(new TajoConf());
}
@@ -168,7 +168,7 @@ public class TestDirectRawFile {
return writeRowBlock(conf, meta, rowBlock, outputFile);
}
- @Test
+ @Test(timeout = 60000)
public void testRWForAllTypesWithNextTuple() throws IOException {
int rowNum = 10000;
@@ -198,7 +198,7 @@ public class TestDirectRawFile {
assertEquals(rowNum, j);
}
- @Test
+ @Test(timeout = 60000)
public void testRepeatedScan() throws IOException {
int rowNum = 2;
@@ -226,7 +226,7 @@ public class TestDirectRawFile {
reader.close();
}
- @Test
+ @Test(timeout = 60000)
public void testReset() throws IOException {
int rowNum = 2;