You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/09 10:18:25 UTC

tajo git commit: TAJO-2014: TestRpcClientManager fails occasionally.

Repository: tajo
Updated Branches:
  refs/heads/master c1beaa71c -> b1959116a


TAJO-2014: TestRpcClientManager fails occasionally.

Closes #905


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b1959116
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b1959116
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b1959116

Branch: refs/heads/master
Commit: b1959116a7c129bc7de4156c366c81cebc14958a
Parents: c1beaa7
Author: Jinho Kim <jh...@apache.org>
Authored: Wed Dec 9 18:17:38 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed Dec 9 18:17:38 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/rpc/NettyServerBase.java    | 13 ++--
 .../org/apache/tajo/rpc/RpcClientManager.java   | 10 +--
 .../apache/tajo/rpc/TestRpcClientManager.java   | 79 +++++++++-----------
 .../apache/tajo/storage/TestFileTablespace.java | 24 +++---
 .../tajo/storage/raw/TestDirectRawFile.java     |  8 +-
 6 files changed, 64 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a7a7966..6f20d8d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -57,6 +57,8 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2014: TestRpcClientManager fails occasionally. (jinho)
+
     TAJO-2000: BSTIndex can cause OOM. (jinho)
 
     TAJO-1992: \set timezone in cli doesn't work because of casesensitive (DaeMyung)

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 08d877f..5d1ad26 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -140,22 +141,22 @@ public class NettyServerBase {
     for (RpcEventListener listener: listeners) {
       listener.onBeforeShutdown(this);
     }
-    
+
     try {
       accepted.close();
 
-      if(bootstrap != null) {
+      if (bootstrap != null) {
         if (bootstrap.childGroup() != null) {
-          bootstrap.childGroup().shutdownGracefully();
+          Future future = bootstrap.childGroup().shutdownGracefully();
           if (waitUntilThreadsStop) {
-            bootstrap.childGroup().terminationFuture().sync();
+            future.sync();
           }
         }
 
         if (bootstrap.group() != null) {
-          bootstrap.group().shutdownGracefully();
+          Future future = bootstrap.group().shutdownGracefully();
           if (waitUntilThreadsStop) {
-            bootstrap.childGroup().terminationFuture().sync();
+            future.sync();
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index 032cf35..67c5936 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -18,10 +18,10 @@
 
 package org.apache.tajo.rpc;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.internal.logging.CommonsLoggerFactory;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 import org.apache.commons.logging.Log;
@@ -106,7 +106,7 @@ public class RpcClientManager {
         public void channelRegistered(ChannelHandlerContext ctx) {
           /* Register client to managed map */
           clients.put(target.getKey(), target);
-          target.getChannel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey()));
+          ctx.channel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey()));
         }
 
         @Override
@@ -210,7 +210,7 @@ public class RpcClientManager {
     }
   }
 
-  static class ClientCloseFutureListener implements GenericFutureListener {
+  static class ClientCloseFutureListener implements ChannelFutureListener {
     private RpcConnectionKey key;
 
     public ClientCloseFutureListener(RpcConnectionKey key) {
@@ -218,7 +218,7 @@ public class RpcClientManager {
     }
 
     @Override
-    public void operationComplete(Future future) throws Exception {
+    public void operationComplete(ChannelFuture future) throws Exception {
       clients.remove(key);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
index 865e5dd..2b50c1f 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
@@ -20,6 +20,8 @@ package org.apache.tajo.rpc;
 
 import org.apache.tajo.rpc.test.DummyProtocol;
 import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetSocketAddress;
@@ -34,15 +36,27 @@ import static org.junit.Assert.*;
 
 public class TestRpcClientManager {
 
+  static NettyServerBase server;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), 10);
+    server.start();
+
+  }
+
+  @AfterClass
+  public static void afterClass(){
+    server.shutdown(true);
+  }
+
   @Test
   public void testRaceCondition() throws Exception {
     final int parallelCount = 50;
-    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
     ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
 
-    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0), parallelCount);
-    server.start();
     try {
       final InetSocketAddress address = server.getListenAddress();
       final RpcClientManager manager = RpcClientManager.getInstance();
@@ -54,7 +68,7 @@ public class TestRpcClientManager {
               public void run() {
                 NettyClientBase client = null;
                 try {
-                  client = manager.getClient(address, DummyProtocol.class, false, new Properties());
+                  client = manager.getClient(address, DummyProtocol.class, true, new Properties());
                 } catch (Throwable e) {
                   fail(e.getMessage());
                 }
@@ -68,10 +82,9 @@ public class TestRpcClientManager {
         future.get();
       }
 
-      NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false, new Properties());
+      NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, true, new Properties());
       RpcClientManager.cleanup(clientBase);
     } finally {
-      server.shutdown(true);
       executor.shutdown();
       RpcClientManager.close();
     }
@@ -79,10 +92,6 @@ public class TestRpcClientManager {
 
   @Test
   public void testClientCloseEvent() throws Exception {
-    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
-    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0), 3);
-    server.start();
     RpcClientManager manager = RpcClientManager.getInstance();
 
     try {
@@ -97,18 +106,13 @@ public class TestRpcClientManager {
       client.close();
       assertFalse(RpcClientManager.contains(key));
     } finally {
-      server.shutdown(true);
       RpcClientManager.close();
     }
   }
 
   @Test
   public void testClientCloseEventWithReconnect() throws Exception {
-    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
-    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0), 3);
-    server.start();
-    int repeat = 10;
+    int repeat = 100;
     RpcClientManager manager = RpcClientManager.getInstance();
 
     try {
@@ -133,46 +137,37 @@ public class TestRpcClientManager {
         assertFalse(RpcClientManager.contains(key));
       }
     } finally {
-      server.shutdown(true);
       RpcClientManager.close();
     }
   }
 
   @Test
   public void testUnManagedClient() throws Exception {
-    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
-    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0), 3);
-    server.start();
     RpcConnectionKey key =
         new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
     RpcClientManager.close();
     RpcClientManager manager = RpcClientManager.getInstance();
 
-    try {
-      NettyClientBase client1 = manager.newClient(key, new Properties());
-      assertTrue(client1.isConnected());
-      assertFalse(RpcClientManager.contains(key));
+    NettyClientBase client1 = manager.newClient(key, new Properties());
+    assertTrue(client1.isConnected());
+    assertFalse(RpcClientManager.contains(key));
 
-      NettyClientBase client2 = manager.newClient(key, new Properties());
-      assertTrue(client2.isConnected());
-      assertFalse(RpcClientManager.contains(key));
+    NettyClientBase client2 = manager.newClient(key, new Properties());
+    assertTrue(client2.isConnected());
+    assertFalse(RpcClientManager.contains(key));
 
-      assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress());
-      assertNotEquals(client1.getChannel(), client2.getChannel());
+    assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress());
+    assertNotEquals(client1.getChannel(), client2.getChannel());
 
-      client1.close();
-      assertFalse(client1.isConnected());
-      assertTrue(client2.isConnected());
+    client1.close();
+    assertFalse(client1.isConnected());
+    assertTrue(client2.isConnected());
 
-      client1.connect();
-      client2.close();
-      assertFalse(client2.isConnected());
-      assertTrue(client1.isConnected());
+    client1.connect();
+    client2.close();
+    assertFalse(client2.isConnected());
+    assertTrue(client1.isConnected());
 
-      RpcClientManager.cleanup(client1, client2);
-    } finally {
-      server.shutdown(true);
-    }
+    RpcClientManager.cleanup(client1, client2);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index cd5bd32..d599a25 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -101,17 +101,17 @@ public class TestFileTablespace {
 		assertEquals(4,i);
 	}
 
-  @Test
+  @Test(timeout = 60000)
   public void testGetSplit() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
     conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(1).build();
-    cluster.waitClusterUp();
     TajoConf tajoConf = new TajoConf(conf);
     tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
 
@@ -158,16 +158,16 @@ public class TestFileTablespace {
     }
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testZeroLengthSplit() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(1).build();
-    cluster.waitClusterUp();
     TajoConf tajoConf = new TajoConf(conf);
     tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
 
@@ -207,17 +207,17 @@ public class TestFileTablespace {
     }
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
     conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(2).build();
-    cluster.waitClusterUp();
 
     TajoConf tajoConf = new TajoConf(conf);
     tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
@@ -258,17 +258,17 @@ public class TestFileTablespace {
     }
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testGetFileTablespace() throws Exception {
     final Configuration hdfsConf = new HdfsConfiguration();
     String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
     hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
     hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    hdfsConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
     hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
 
     final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(hdfsConf).numDataNodes(2).build();
-    cluster.waitClusterUp();
+        new MiniDFSCluster.Builder(hdfsConf).numDataNodes(1).build();
     URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo");
 
     Optional<Tablespace> existingTs = Optional.empty();
@@ -287,13 +287,7 @@ public class TestFileTablespace {
 
       space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace");
       assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
-
     } finally {
-
-      if (existingTs.isPresent()) {
-        TablespaceManager.addTableSpaceForTest(existingTs.get());
-      }
-
       cluster.shutdown(true);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
index bae81a9..179c9eb 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java
@@ -100,6 +100,7 @@ public class TestDirectRawFile {
     String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
     conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
 
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
@@ -110,7 +111,6 @@ public class TestDirectRawFile {
     builder.waitSafeMode(true);
     cluster = builder.build();
 
-    cluster.waitClusterUp();
     dfs = cluster.getFileSystem();
     localFs = FileSystem.getLocal(new TajoConf());
   }
@@ -168,7 +168,7 @@ public class TestDirectRawFile {
     return writeRowBlock(conf, meta, rowBlock, outputFile);
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testRWForAllTypesWithNextTuple() throws IOException {
     int rowNum = 10000;
 
@@ -198,7 +198,7 @@ public class TestDirectRawFile {
     assertEquals(rowNum, j);
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testRepeatedScan() throws IOException {
     int rowNum = 2;
 
@@ -226,7 +226,7 @@ public class TestDirectRawFile {
     reader.close();
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testReset() throws IOException {
     int rowNum = 2;