You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/18 18:25:44 UTC

[05/13] tajo git commit: TAJO-1405: Fix some illegal way of usages on connection pool. (Contributed by navis, Committed by Keuntae Park)

TAJO-1405: Fix some illegal way of usages on connection pool. (Contributed by navis, Committed by Keuntae Park)

Closes #425


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/286b9567
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/286b9567
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/286b9567

Branch: refs/heads/index_support
Commit: 286b956795a4dc2efb72c97896d86ed1049485e3
Parents: 0dc7d68
Author: Keuntae Park <si...@apache.org>
Authored: Tue Mar 17 13:47:20 2015 +0900
Committer: Keuntae Park <si...@apache.org>
Committed: Tue Mar 17 13:47:20 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../apache/tajo/client/SessionConnection.java   | 18 +++--
 .../tajo/worker/ExecutionBlockContext.java      | 43 +++++++----
 .../main/java/org/apache/tajo/worker/Task.java  | 75 +++++++++++---------
 .../java/org/apache/tajo/worker/TaskRunner.java |  9 ++-
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  9 ++-
 .../org/apache/tajo/rpc/NettyClientBase.java    |  6 ++
 7 files changed, 103 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9d2cd14..0d7222f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1405: Fix some illegal way of usages on connection pool. 
+    (Contributed by navis, Committed by Keuntae Park)
+
     TAJO-1384: Duplicated output file path problem. (jihoon)
 
     TAJO-1386: CURRENT_DATE generates parsing errors sometimes.

http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index d24e7b3..c084d95 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -57,7 +57,7 @@ public class SessionConnection implements Closeable {
 
   final RpcConnectionPool connPool;
 
-  private final String baseDatabase;
+  private String baseDatabase;
 
   private final UserRoleInfo userInfo;
 
@@ -260,7 +260,8 @@ public class SessionConnection implements Closeable {
   }
 
   public Boolean selectDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+    Boolean selected = new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -269,6 +270,11 @@ public class SessionConnection implements Closeable {
         return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
       }
     }.withRetries();
+
+    if (selected == Boolean.TRUE) {
+      this.baseDatabase = databaseName;
+    }
+    return selected;
   }
 
   @Override
@@ -278,13 +284,15 @@ public class SessionConnection implements Closeable {
     }
 
     // remove session
+    NettyClientBase client = null;
     try {
-
-      NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+      client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
       tajoMaster.removeSession(null, sessionId);
-
     } catch (Throwable e) {
+      // ignore
+    } finally {
+      connPool.releaseConnection(client);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index a645689..2377720 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -36,13 +36,11 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
-import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 
 import io.netty.channel.ConnectTimeoutException;
 import io.netty.channel.EventLoopGroup;
@@ -139,7 +137,17 @@ public class ExecutionBlockContext {
     try{
       this.resource.initialize(queryContext, plan);
     } catch (Throwable e) {
-      getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+      try {
+        NettyClientBase client = getQueryMasterConnection();
+        try {
+          QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+          stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+        } finally {
+          connPool.releaseConnection(client);
+        }
+      } catch (Throwable t) {
+        //ignore
+      }
       throw e;
     }
   }
@@ -148,15 +156,13 @@ public class ExecutionBlockContext {
     return resource;
   }
 
-  public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub()
+  public NettyClientBase getQueryMasterConnection()
       throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
-    NettyClientBase clientBase = null;
-    try {
-      clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
-      return clientBase.getStub();
-    } finally {
-      connPool.releaseConnection(clientBase);
-    }
+    return connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+  }
+
+  public void releaseConnection(NettyClientBase connection) {
+    connPool.releaseConnection(connection);
   }
 
   public void stop(){
@@ -267,7 +273,13 @@ public class ExecutionBlockContext {
   }
 
   private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
-    getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
+    NettyClientBase client = getQueryMasterConnection();
+    try {
+      QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+      stub.doneExecutionBlock(null, reporter, NullCallback.get());
+    } finally {
+      connPool.releaseConnection(client);
+    }
   }
 
   protected void reportExecutionBlock(ExecutionBlockId ebId) {
@@ -361,12 +373,14 @@ public class ExecutionBlockContext {
 
       return new Runnable() {
         int remainingRetries = MAX_RETRIES;
-        QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub;
         @Override
         public void run() {
           while (!reporterStop.get() && !Thread.interrupted()) {
+
+            NettyClientBase client = null;
             try {
-              masterStub = getQueryMasterStub();
+              client = getQueryMasterConnection();
+              QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();
 
               if(tasks.size() == 0){
                 masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
@@ -391,6 +405,7 @@ public class ExecutionBlockContext {
                 throw new RuntimeException(t);
               }
             } finally {
+              releaseConnection(client);
               if (remainingRetries > 0 && !reporterStop.get()) {
                 synchronized (reporterThread) {
                   try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 524b09b..9ff18dd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -52,6 +52,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -424,46 +425,52 @@ public class Task {
 
       executionBlockContext.completedTasksNum.incrementAndGet();
       context.getHashShuffleAppenderManager().finalizeTask(taskId);
-      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
-      if (context.isStopped()) {
-        context.setExecutorProgress(0.0f);
 
-        if(context.getState() == TaskAttemptState.TA_KILLED) {
-          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
-          executionBlockContext.killedTasksNum.incrementAndGet();
-        } else {
-          context.setState(TaskAttemptState.TA_FAILED);
-          TaskFatalErrorReport.Builder errorBuilder =
-              TaskFatalErrorReport.newBuilder()
-                  .setId(getId().getProto());
-          if (error != null) {
-            if (error.getMessage() == null) {
-              errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
-            } else {
-              errorBuilder.setErrorMessage(error.getMessage());
+      NettyClientBase client = executionBlockContext.getQueryMasterConnection();
+      try {
+        QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
+        if (context.isStopped()) {
+          context.setExecutorProgress(0.0f);
+
+          if (context.getState() == TaskAttemptState.TA_KILLED) {
+            queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+            executionBlockContext.killedTasksNum.incrementAndGet();
+          } else {
+            context.setState(TaskAttemptState.TA_FAILED);
+            TaskFatalErrorReport.Builder errorBuilder =
+                TaskFatalErrorReport.newBuilder()
+                    .setId(getId().getProto());
+            if (error != null) {
+              if (error.getMessage() == null) {
+                errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
+              } else {
+                errorBuilder.setErrorMessage(error.getMessage());
+              }
+              errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
             }
-            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
+
+            queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+            executionBlockContext.failedTasksNum.incrementAndGet();
           }
+        } else {
+          // if successful
+          context.setProgress(1.0f);
+          context.setState(TaskAttemptState.TA_SUCCEEDED);
+          executionBlockContext.succeededTasksNum.incrementAndGet();
 
-          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
-          executionBlockContext.failedTasksNum.incrementAndGet();
+          TaskCompletionReport report = getTaskCompletionReport();
+          queryMasterStub.done(null, report, NullCallback.get());
         }
-      } else {
-        // if successful
-        context.setProgress(1.0f);
-        context.setState(TaskAttemptState.TA_SUCCEEDED);
-        executionBlockContext.succeededTasksNum.incrementAndGet();
-
-        TaskCompletionReport report = getTaskCompletionReport();
-        queryMasterStub.done(null, report, NullCallback.get());
+        finishTime = System.currentTimeMillis();
+        LOG.info(context.getTaskId() + " completed. " +
+            "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+            ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+            + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+            + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+        cleanupTask();
+      } finally {
+        executionBlockContext.releaseConnection(client);
       }
-      finishTime = System.currentTimeMillis();
-      LOG.info(context.getTaskId() + " completed. " +
-          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
-          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
-          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
-          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
-      cleanupTask();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 79725f6..058ea42 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -35,6 +35,7 @@ import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.container.TajoContainerIdPBImpl;
 import org.apache.tajo.master.container.TajoConverterUtils;
 import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 
 import io.netty.channel.ConnectTimeoutException;
@@ -196,9 +197,9 @@ public class TaskRunner extends AbstractService {
           TaskRequestProto taskRequest = null;
 
           while(!stopped) {
-            QueryMasterProtocolService.Interface qmClientService;
+            NettyClientBase client;
             try {
-              qmClientService = getContext().getQueryMasterStub();
+              client = executionBlockContext.getQueryMasterConnection();
             } catch (ConnectTimeoutException ce) {
               // NettyClientBase throws ConnectTimeoutException if connection was failed
               stop();
@@ -212,6 +213,8 @@ public class TaskRunner extends AbstractService {
               break;
             }
 
+            QueryMasterProtocolService.Interface qmClientService = client.getStub();
+
             try {
               if (callFuture == null) {
                 callFuture = new CallFuture<TaskRequestProto>();
@@ -296,6 +299,8 @@ public class TaskRunner extends AbstractService {
               }
             } catch (Throwable t) {
               LOG.fatal(t.getMessage(), t);
+            } finally {
+              executionBlockContext.releaseConnection(client);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 1ea9fb1..3d856ce 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -35,11 +35,12 @@ import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class AsyncRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
 
-  private final Map<Integer, ResponseCallback> requests =
+  private final ConcurrentMap<Integer, ResponseCallback> requests =
       new ConcurrentHashMap<Integer, ResponseCallback>();
 
   private final Method stubMethod;
@@ -178,14 +179,12 @@ public class AsyncRpcClient extends NettyClientBase {
   @ChannelHandler.Sharable
   private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
 
-    synchronized void registerCallback(int seqId, ResponseCallback callback) {
+    void registerCallback(int seqId, ResponseCallback callback) {
 
-      if (requests.containsKey(seqId)) {
+      if (requests.putIfAbsent(seqId, callback) != null) {
         throw new RemoteException(
             getErrorMessage("Duplicate Sequence Id "+ seqId));
       }
-
-      requests.put(seqId, callback);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 7dfc5a2..72278f2 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -132,6 +132,12 @@ public abstract class NettyClientBase implements Closeable {
     final CountDownLatch ticket = new CountDownLatch(1);
     final CountDownLatch granted = connect.check(ticket);
 
+    // basically, it's double checked lock
+    if (ticket == granted && isConnected()) {
+      granted.countDown();
+      return true;
+    }
+
     if (ticket == granted) {
       connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
     }