You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2016/10/10 08:53:01 UTC
lens git commit: LENS-1345: Fixing deadlock in jdbc query status
update flow
Repository: lens
Updated Branches:
refs/heads/master 61ee6bfc8 -> 9ef7ce736
LENS-1345: Fixing deadlock in jdbc query status update flow
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/9ef7ce73
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/9ef7ce73
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/9ef7ce73
Branch: refs/heads/master
Commit: 9ef7ce73693d039ab5a197b4227c5c09c2efcaa4
Parents: 61ee6bf
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Mon Oct 10 14:20:31 2016 +0530
Committer: Rajat Khandelwal <ra...@gmail.com>
Committed: Mon Oct 10 14:20:31 2016 +0530
----------------------------------------------------------------------
.../org/apache/lens/driver/jdbc/JDBCDriver.java | 58 ++++++++------------
.../apache/lens/driver/jdbc/TestJdbcDriver.java | 3 +
.../server/api/driver/AbstractLensDriver.java | 4 ++
.../server/api/driver/DriverQueryStatus.java | 7 ++-
.../lens/server/api/query/QueryContext.java | 20 ++++---
.../server/query/QueryExecutionServiceImpl.java | 5 +-
.../lens/server/common/RestAPITestUtil.java | 6 +-
.../lens/server/query/TestQueryService.java | 22 ++++++--
8 files changed, 72 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
index f805ec6..e41077c 100644
--- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
+++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
@@ -292,11 +292,11 @@ public class JDBCDriver extends AbstractLensDriver {
if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
return result;
}
- queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable);
- queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL);
if (isResultAvailable) {
result.resultSet = stmt.getResultSet();
}
+ queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable);
+ queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL);
} catch (Exception e) {
if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
return result;
@@ -887,7 +887,6 @@ public class JDBCDriver extends AbstractLensDriver {
queryContext.setPrepared(false);
queryContext.setRewrittenQuery(rewrittenQuery);
return new QueryCallable(queryContext, logSegregationContext).call();
- // LOG.info("Execute " + context.getQueryHandle());
}
/**
@@ -930,51 +929,40 @@ public class JDBCDriver extends AbstractLensDriver {
return;
}
if (ctx.getResultFuture().isCancelled()) {
- context.getDriverStatus().setProgress(1.0);
- context.getDriverStatus().setState(DriverQueryState.CANCELED);
- context.getDriverStatus().setStatusMessage("Query Canceled");
+ if (!context.getDriverStatus().isCanceled()) {
+ context.getDriverStatus().setProgress(1.0);
+ context.getDriverStatus().setState(DriverQueryState.CANCELED);
+ context.getDriverStatus().setStatusMessage("Query Canceled");
+ }
} else if (ctx.getResultFuture().isDone()) {
context.getDriverStatus().setProgress(1.0);
// Since future is already done, this call should not block
if (ctx.getQueryResult() != null && ctx.getQueryResult().error != null) {
- context.getDriverStatus().setState(DriverQueryState.FAILED);
- context.getDriverStatus().setStatusMessage("Query execution failed!");
- context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage());
+ if (!context.getDriverStatus().isFailed()) {
+ context.getDriverStatus().setState(DriverQueryState.FAILED);
+ context.getDriverStatus().setStatusMessage("Query execution failed!");
+ context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage());
+ }
} else {
- context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL);
- context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " successful");
- context.getDriverStatus().setResultSetAvailable(true);
+ if (!context.getDriverStatus().isFinished()) {
+ // assuming successful
+ context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL);
+ context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " successful");
+ context.getDriverStatus().setResultSetAvailable(true);
+ }
}
} else {
- context.getDriverStatus().setState(DriverQueryState.RUNNING);
- context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running");
+ if (!context.getDriverStatus().isRunning()) {
+ context.getDriverStatus().setState(DriverQueryState.RUNNING);
+ context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running");
+ }
}
}
@Override
protected LensResultSet createResultSet(QueryContext ctx) throws LensException {
checkConfigured();
- return getDriverResult(ctx);
- }
-
- private LensResultSet getDriverResult(QueryContext context) throws LensException {
- JdbcQueryContext ctx = getQueryContext(context.getQueryHandle());
- if (ctx.getLensContext().getDriverStatus().isCanceled()) {
- throw new LensException("Result set not available for canceled query " + context.getQueryHandle());
- }
-
- Future<QueryResult> future = ctx.getResultFuture();
- QueryHandle queryHandle = context.getQueryHandle();
-
- try {
- return future.get().getLensResultSet(true);
- } catch (InterruptedException e) {
- throw new LensException("Interrupted while getting resultset for query " + queryHandle.getHandleId(), e);
- } catch (ExecutionException e) {
- throw new LensException("Error while executing query " + queryHandle.getHandleId() + " in background", e);
- } catch (CancellationException e) {
- throw new LensException("Query was already canceled " + queryHandle.getHandleId(), e);
- }
+ return getQueryContext(ctx.getQueryHandle()).getQueryResult().getLensResultSet(true);
}
/**
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
index 6e9086f..2ad7f76 100644
--- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
+++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
@@ -496,6 +496,9 @@ public class TestJdbcDriver {
QueryContext context = createQueryContext(query, conf);
context.setExecuteTimeoutMillis(executeTimeoutMillis);
driver.executeAsync(context);
+ while (!context.getDriverStatus().isFinished()) {
+ Thread.sleep(1000);
+ }
LensResultSet resultSet = driver.fetchResultSet(context);
assertNotNull(resultSet);
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
index e498479..365a619 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
@@ -66,6 +66,10 @@ public abstract class AbstractLensDriver implements LensDriver {
@Override
public LensResultSet fetchResultSet(QueryContext ctx) throws LensException {
log.info("FetchResultSet: {}", ctx.getQueryHandle());
+ if (!ctx.getDriverStatus().isSuccessful()) {
+ throw new LensException("Can't fetch results for a " + ctx.getQueryHandleString() + " because it's status is "
+ + ctx.getStatus());
+ }
ctx.registerDriverResult(createResultSet(ctx)); // registerDriverResult makes sure registration happens ony once
return ctx.getDriverResult();
}
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
index 033f677..fc24fc6 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
@@ -218,5 +218,10 @@ public class DriverQueryStatus implements Serializable {
public boolean isCanceled() {
return state.equals(DriverQueryState.CANCELED);
}
-
+ public boolean isFailed() {
+ return state.equals(DriverQueryState.FAILED);
+ }
+ public boolean isRunning() {
+ return state.equals(DriverQueryState.RUNNING);
+ }
}
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
index b584c6a..d0662f4 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
@@ -23,10 +23,7 @@ import static org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INME
import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET;
import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Future;
import org.apache.lens.api.LensConf;
@@ -203,7 +200,7 @@ public class QueryContext extends AbstractQueryContext {
@Getter
@Setter
private transient Future queryLauncher;
- private List<QueryDriverStatusUpdateListener> driverStatusUpdateListener = Lists.newArrayList();
+ private final List<QueryDriverStatusUpdateListener> driverStatusUpdateListeners = Lists.newArrayList();
/**
* Creates context from query
@@ -469,6 +466,9 @@ public class QueryContext extends AbstractQueryContext {
public boolean successful() {
return this.status.successful();
}
+ public boolean executed() {
+ return this.status.executed();
+ }
public boolean launched() {
return this.status.launched();
@@ -558,8 +558,10 @@ public class QueryContext extends AbstractQueryContext {
getDriverStatus().setStatusMessage("Query " + getQueryHandleString() + " " + state.name().toLowerCase());
}
getDriverStatus().setState(state);
- for (QueryDriverStatusUpdateListener listener: this.driverStatusUpdateListener) {
- listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus());
+ synchronized (this.driverStatusUpdateListeners) {
+ for (QueryDriverStatusUpdateListener listener : this.driverStatusUpdateListeners) {
+ listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus());
+ }
}
}
@@ -573,6 +575,8 @@ public class QueryContext extends AbstractQueryContext {
public void registerStatusUpdateListener(QueryDriverStatusUpdateListener driverStatusUpdateListener) {
- this.driverStatusUpdateListener.add(driverStatusUpdateListener);
+ synchronized (this.driverStatusUpdateListeners) {
+ this.driverStatusUpdateListeners.add(driverStatusUpdateListener);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 87d7cb0..cb5961f 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -582,12 +582,11 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
driverRS = ctx.getSelectedDriver().fetchResultSet(getCtx());
} catch (Exception e) {
log.error(
- "Error while getting result set form driver {}. Driver result set based purging logic will be ignored",
- ctx.getSelectedDriver(), e);
+ "Error while getting result set form driver {}. Driver result set based purging logic will be ignored",
+ ctx.getSelectedDriver(), e);
}
}
}
-
public boolean canBePurged() {
try {
if (getCtx().getStatus().getStatus().equals(SUCCESSFUL) && getCtx().getStatus().isResultSetAvailable()) {
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
index 57786e6..02e2f8b 100644
--- a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
+++ b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
@@ -249,9 +249,13 @@ public class RestAPITestUtil {
public static PersistentQueryResult getLensQueryResult(final WebTarget target,
final LensSessionHandle lensSessionHandle, final QueryHandle handle, MediaType mt) throws InterruptedException {
+ return getLensQueryResult(target, lensSessionHandle, handle, PersistentQueryResult.class, mt);
+ }
+ public static <T> T getLensQueryResult(final WebTarget target, final LensSessionHandle lensSessionHandle,
+ final QueryHandle handle, Class<T> clazz, MediaType mt) throws InterruptedException {
waitForQueryToFinish(target, lensSessionHandle, handle, QueryStatus.Status.SUCCESSFUL, mt);
return target.path("queryapi/queries").path(handle.toString()).path("resultset")
- .queryParam("sessionid", lensSessionHandle).request(mt).get(PersistentQueryResult.class);
+ .queryParam("sessionid", lensSessionHandle).request(mt).get(clazz);
}
public static Response getLensQueryHttpResult(final WebTarget target, final LensSessionHandle lensSessionHandle,
http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index 3f71aef..440c30b 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -66,6 +66,7 @@ import org.apache.lens.server.api.query.QueryContext;
import org.apache.lens.server.api.query.QueryExecutionService;
import org.apache.lens.server.api.session.SessionService;
import org.apache.lens.server.common.ErrorResponseExpectedData;
+import org.apache.lens.server.common.RestAPITestUtil;
import org.apache.lens.server.common.TestDataUtils;
import org.apache.lens.server.common.TestResourceFile;
import org.apache.lens.server.error.GenericExceptionMapper;
@@ -1426,6 +1427,17 @@ public class TestQueryService extends LensJerseyTest {
};
}
+ @Test
+ public void testExecuteAsyncJDBCQuery() throws InterruptedException {
+ String query = "select ID, IDSTR from " + TEST_JDBC_TABLE;
+ QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of(query),
+ Optional.of(getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, false)), APPLICATION_XML_TYPE);
+ // fetch results so that it can be purged
+ InMemoryQueryResult queryResult = RestAPITestUtil.getLensQueryResult(target(), lensSessionId, handle,
+ InMemoryQueryResult.class, APPLICATION_XML_TYPE);
+ assertEquals(queryResult.getRows().size(), 5);
+ }
+
/**
* @param timeOutMillis : wait time for execute with timeout api
* @param preFetchRows : number of rows to pre-fetch in case of InMemoryResultSet
@@ -1457,7 +1469,7 @@ public class TestQueryService extends LensJerseyTest {
conf.addProperty("deferPersistenceByMillis", deferPersistenceByMillis); // property used for test only
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf,
APPLICATION_XML_TYPE));
- QueryHandleWithResultSet result =target.request(APPLICATION_XML_TYPE)
+ QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE)
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE),
new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData();
QueryHandle handle = result.getQueryHandle();
@@ -1945,16 +1957,16 @@ public class TestQueryService extends LensJerseyTest {
WebTarget target = target().path("queryapi/queries");
final FormDataMultiPart mp = new FormDataMultiPart();
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId,
- MediaType.APPLICATION_XML_TYPE));
+ APPLICATION_XML_TYPE));
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from "
+ TEST_TABLE));
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout"));
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), "300000"));
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("queryName").build(), queryName.toString()));
mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(),
- MediaType.APPLICATION_XML_TYPE));
+ APPLICATION_XML_TYPE));
- QueryHandleWithResultSet result = target.request(MediaType.APPLICATION_XML_TYPE)
+ QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE)
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE),
new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData();
assertNotNull(result.getQueryHandle());
@@ -1963,7 +1975,7 @@ public class TestQueryService extends LensJerseyTest {
target = target().path("queryapi/queries/detail");
List<LensQuery> results = target.queryParam("queryName", queryName)
.queryParam("sessionid", lensSessionId)
- .request(MediaType.APPLICATION_XML_TYPE)
+ .request(APPLICATION_XML_TYPE)
.get(new GenericType<List<LensQuery>>(){});
Assert.assertNotNull(results);
Assert.assertEquals(1, results.size());