You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2016/10/10 08:53:01 UTC

lens git commit: LENS-1345: Fixing deadlock in jdbc query status update flow

Repository: lens
Updated Branches:
  refs/heads/master 61ee6bfc8 -> 9ef7ce736


LENS-1345: Fixing deadlock in jdbc query status update flow


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/9ef7ce73
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/9ef7ce73
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/9ef7ce73

Branch: refs/heads/master
Commit: 9ef7ce73693d039ab5a197b4227c5c09c2efcaa4
Parents: 61ee6bf
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Mon Oct 10 14:20:31 2016 +0530
Committer: Rajat Khandelwal <ra...@gmail.com>
Committed: Mon Oct 10 14:20:31 2016 +0530

----------------------------------------------------------------------
 .../org/apache/lens/driver/jdbc/JDBCDriver.java | 58 ++++++++------------
 .../apache/lens/driver/jdbc/TestJdbcDriver.java |  3 +
 .../server/api/driver/AbstractLensDriver.java   |  4 ++
 .../server/api/driver/DriverQueryStatus.java    |  7 ++-
 .../lens/server/api/query/QueryContext.java     | 20 ++++---
 .../server/query/QueryExecutionServiceImpl.java |  5 +-
 .../lens/server/common/RestAPITestUtil.java     |  6 +-
 .../lens/server/query/TestQueryService.java     | 22 ++++++--
 8 files changed, 72 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
index f805ec6..e41077c 100644
--- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
+++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
@@ -292,11 +292,11 @@ public class JDBCDriver extends AbstractLensDriver {
             if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
               return result;
             }
-            queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable);
-            queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL);
             if (isResultAvailable) {
               result.resultSet = stmt.getResultSet();
             }
+            queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable);
+            queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL);
           } catch (Exception e) {
             if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
               return result;
@@ -887,7 +887,6 @@ public class JDBCDriver extends AbstractLensDriver {
     queryContext.setPrepared(false);
     queryContext.setRewrittenQuery(rewrittenQuery);
     return new QueryCallable(queryContext, logSegregationContext).call();
-    // LOG.info("Execute " + context.getQueryHandle());
   }
 
   /**
@@ -930,51 +929,40 @@ public class JDBCDriver extends AbstractLensDriver {
       return;
     }
     if (ctx.getResultFuture().isCancelled()) {
-      context.getDriverStatus().setProgress(1.0);
-      context.getDriverStatus().setState(DriverQueryState.CANCELED);
-      context.getDriverStatus().setStatusMessage("Query Canceled");
+      if (!context.getDriverStatus().isCanceled()) {
+        context.getDriverStatus().setProgress(1.0);
+        context.getDriverStatus().setState(DriverQueryState.CANCELED);
+        context.getDriverStatus().setStatusMessage("Query Canceled");
+      }
     } else if (ctx.getResultFuture().isDone()) {
       context.getDriverStatus().setProgress(1.0);
       // Since future is already done, this call should not block
       if (ctx.getQueryResult() != null && ctx.getQueryResult().error != null) {
-        context.getDriverStatus().setState(DriverQueryState.FAILED);
-        context.getDriverStatus().setStatusMessage("Query execution failed!");
-        context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage());
+        if (!context.getDriverStatus().isFailed()) {
+          context.getDriverStatus().setState(DriverQueryState.FAILED);
+          context.getDriverStatus().setStatusMessage("Query execution failed!");
+          context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage());
+        }
       } else {
-        context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL);
-        context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " successful");
-        context.getDriverStatus().setResultSetAvailable(true);
+        if (!context.getDriverStatus().isFinished()) {
+          // assuming successful
+          context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL);
+          context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " successful");
+          context.getDriverStatus().setResultSetAvailable(true);
+        }
       }
     } else {
-      context.getDriverStatus().setState(DriverQueryState.RUNNING);
-      context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running");
+      if (!context.getDriverStatus().isRunning()) {
+        context.getDriverStatus().setState(DriverQueryState.RUNNING);
+        context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running");
+      }
     }
   }
 
   @Override
   protected LensResultSet createResultSet(QueryContext ctx) throws LensException {
     checkConfigured();
-    return getDriverResult(ctx);
-  }
-
-  private LensResultSet getDriverResult(QueryContext context) throws LensException {
-    JdbcQueryContext ctx = getQueryContext(context.getQueryHandle());
-    if (ctx.getLensContext().getDriverStatus().isCanceled()) {
-      throw new LensException("Result set not available for canceled query " + context.getQueryHandle());
-    }
-
-    Future<QueryResult> future = ctx.getResultFuture();
-    QueryHandle queryHandle = context.getQueryHandle();
-
-    try {
-      return future.get().getLensResultSet(true);
-    } catch (InterruptedException e) {
-      throw new LensException("Interrupted while getting resultset for query " + queryHandle.getHandleId(), e);
-    } catch (ExecutionException e) {
-      throw new LensException("Error while executing query " + queryHandle.getHandleId() + " in background", e);
-    } catch (CancellationException e) {
-      throw new LensException("Query was already canceled " + queryHandle.getHandleId(), e);
-    }
+    return getQueryContext(ctx.getQueryHandle()).getQueryResult().getLensResultSet(true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
index 6e9086f..2ad7f76 100644
--- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
+++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
@@ -496,6 +496,9 @@ public class TestJdbcDriver {
     QueryContext context = createQueryContext(query, conf);
     context.setExecuteTimeoutMillis(executeTimeoutMillis);
     driver.executeAsync(context);
+    while (!context.getDriverStatus().isFinished()) {
+      Thread.sleep(1000);
+    }
     LensResultSet resultSet = driver.fetchResultSet(context);
     assertNotNull(resultSet);
 

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
index e498479..365a619 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
@@ -66,6 +66,10 @@ public abstract class AbstractLensDriver implements LensDriver {
   @Override
   public LensResultSet fetchResultSet(QueryContext ctx) throws LensException {
     log.info("FetchResultSet: {}", ctx.getQueryHandle());
+    if (!ctx.getDriverStatus().isSuccessful()) {
+      throw new LensException("Can't fetch results for a " + ctx.getQueryHandleString() + " because it's status is "
+        + ctx.getStatus());
+    }
     ctx.registerDriverResult(createResultSet(ctx)); // registerDriverResult makes sure registration happens ony once
     return ctx.getDriverResult();
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
index 033f677..fc24fc6 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
@@ -218,5 +218,10 @@ public class DriverQueryStatus implements Serializable {
   public boolean isCanceled() {
     return state.equals(DriverQueryState.CANCELED);
   }
-
+  public boolean isFailed() {
+    return state.equals(DriverQueryState.FAILED);
+  }
+  public boolean isRunning() {
+    return state.equals(DriverQueryState.RUNNING);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
index b584c6a..d0662f4 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
@@ -23,10 +23,7 @@ import static org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INME
 import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET;
 import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Future;
 
 import org.apache.lens.api.LensConf;
@@ -203,7 +200,7 @@ public class QueryContext extends AbstractQueryContext {
   @Getter
   @Setter
   private transient Future queryLauncher;
-  private List<QueryDriverStatusUpdateListener> driverStatusUpdateListener = Lists.newArrayList();
+  private final List<QueryDriverStatusUpdateListener> driverStatusUpdateListeners = Lists.newArrayList();
 
   /**
    * Creates context from query
@@ -469,6 +466,9 @@ public class QueryContext extends AbstractQueryContext {
   public boolean successful() {
     return this.status.successful();
   }
+  public boolean executed() {
+    return this.status.executed();
+  }
 
   public boolean launched() {
     return this.status.launched();
@@ -558,8 +558,10 @@ public class QueryContext extends AbstractQueryContext {
       getDriverStatus().setStatusMessage("Query " + getQueryHandleString() + " " + state.name().toLowerCase());
     }
     getDriverStatus().setState(state);
-    for (QueryDriverStatusUpdateListener listener: this.driverStatusUpdateListener) {
-      listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus());
+    synchronized (this.driverStatusUpdateListeners) {
+      for (QueryDriverStatusUpdateListener listener : this.driverStatusUpdateListeners) {
+        listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus());
+      }
     }
   }
 
@@ -573,6 +575,8 @@ public class QueryContext extends AbstractQueryContext {
 
 
   public void registerStatusUpdateListener(QueryDriverStatusUpdateListener driverStatusUpdateListener) {
-    this.driverStatusUpdateListener.add(driverStatusUpdateListener);
+    synchronized (this.driverStatusUpdateListeners) {
+      this.driverStatusUpdateListeners.add(driverStatusUpdateListener);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 87d7cb0..cb5961f 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -582,12 +582,11 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
           driverRS = ctx.getSelectedDriver().fetchResultSet(getCtx());
         } catch (Exception e) {
           log.error(
-              "Error while getting result set form driver {}. Driver result set based purging logic will be ignored",
-              ctx.getSelectedDriver(), e);
+            "Error while getting result set form driver {}. Driver result set based purging logic will be ignored",
+            ctx.getSelectedDriver(), e);
         }
       }
     }
-
     public boolean canBePurged() {
       try {
         if (getCtx().getStatus().getStatus().equals(SUCCESSFUL) && getCtx().getStatus().isResultSetAvailable()) {

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
index 57786e6..02e2f8b 100644
--- a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
+++ b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
@@ -249,9 +249,13 @@ public class RestAPITestUtil {
 
   public static PersistentQueryResult getLensQueryResult(final WebTarget target,
     final LensSessionHandle lensSessionHandle, final QueryHandle handle, MediaType mt) throws InterruptedException {
+    return getLensQueryResult(target, lensSessionHandle, handle, PersistentQueryResult.class, mt);
+  }
+  public static <T> T getLensQueryResult(final WebTarget target, final LensSessionHandle lensSessionHandle,
+    final QueryHandle handle, Class<T> clazz, MediaType mt) throws InterruptedException {
     waitForQueryToFinish(target, lensSessionHandle, handle, QueryStatus.Status.SUCCESSFUL, mt);
     return target.path("queryapi/queries").path(handle.toString()).path("resultset")
-      .queryParam("sessionid", lensSessionHandle).request(mt).get(PersistentQueryResult.class);
+      .queryParam("sessionid", lensSessionHandle).request(mt).get(clazz);
   }
 
   public static Response getLensQueryHttpResult(final WebTarget target, final LensSessionHandle lensSessionHandle,

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index 3f71aef..440c30b 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -66,6 +66,7 @@ import org.apache.lens.server.api.query.QueryContext;
 import org.apache.lens.server.api.query.QueryExecutionService;
 import org.apache.lens.server.api.session.SessionService;
 import org.apache.lens.server.common.ErrorResponseExpectedData;
+import org.apache.lens.server.common.RestAPITestUtil;
 import org.apache.lens.server.common.TestDataUtils;
 import org.apache.lens.server.common.TestResourceFile;
 import org.apache.lens.server.error.GenericExceptionMapper;
@@ -1426,6 +1427,17 @@ public class TestQueryService extends LensJerseyTest {
     };
   }
 
+  @Test
+  public void testExecuteAsyncJDBCQuery() throws InterruptedException {
+    String query = "select ID, IDSTR from " + TEST_JDBC_TABLE;
+    QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of(query),
+      Optional.of(getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, false)), APPLICATION_XML_TYPE);
+    // fetch results so that it can be purged
+    InMemoryQueryResult queryResult = RestAPITestUtil.getLensQueryResult(target(), lensSessionId, handle,
+      InMemoryQueryResult.class, APPLICATION_XML_TYPE);
+    assertEquals(queryResult.getRows().size(), 5);
+  }
+
   /**
    * @param timeOutMillis : wait time for execute with timeout api
    * @param preFetchRows : number of rows to pre-fetch in case of InMemoryResultSet
@@ -1457,7 +1469,7 @@ public class TestQueryService extends LensJerseyTest {
     conf.addProperty("deferPersistenceByMillis", deferPersistenceByMillis); // property used for test only
     mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf,
         APPLICATION_XML_TYPE));
-    QueryHandleWithResultSet result =target.request(APPLICATION_XML_TYPE)
+    QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE)
             .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE),
                 new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData();
     QueryHandle handle = result.getQueryHandle();
@@ -1945,16 +1957,16 @@ public class TestQueryService extends LensJerseyTest {
     WebTarget target = target().path("queryapi/queries");
     final FormDataMultiPart mp = new FormDataMultiPart();
     mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId,
-      MediaType.APPLICATION_XML_TYPE));
+      APPLICATION_XML_TYPE));
     mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from "
       + TEST_TABLE));
     mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout"));
     mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), "300000"));
     mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("queryName").build(), queryName.toString()));
     mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(),
-      MediaType.APPLICATION_XML_TYPE));
+      APPLICATION_XML_TYPE));
 
-    QueryHandleWithResultSet result = target.request(MediaType.APPLICATION_XML_TYPE)
+    QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE)
       .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE),
         new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData();
     assertNotNull(result.getQueryHandle());
@@ -1963,7 +1975,7 @@ public class TestQueryService extends LensJerseyTest {
     target = target().path("queryapi/queries/detail");
     List<LensQuery> results = target.queryParam("queryName", queryName)
       .queryParam("sessionid", lensSessionId)
-      .request(MediaType.APPLICATION_XML_TYPE)
+      .request(APPLICATION_XML_TYPE)
       .get(new GenericType<List<LensQuery>>(){});
     Assert.assertNotNull(results);
     Assert.assertEquals(1, results.size());