You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by GitBox <gi...@apache.org> on 2018/08/14 09:38:55 UTC

[GitHub] shaofengshi closed pull request #160: KYLIN-3434 Support prepare statement in Kylin server side

shaofengshi closed pull request #160: KYLIN-3434 Support prepare statement in Kylin server side
URL: https://github.com/apache/kylin/pull/160
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3ae6c2d48b..f4aad5e6bc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1377,6 +1377,14 @@ public String getQueryAccessController() {
         return getOptional("kylin.query.access-controller", null);
     }
 
+    public int getQueryMaxCacheStatementNum() {
+        return Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num", String.valueOf(50000)));
+    }
+
+    public int getQueryMaxCacheStatementInstancePerKey() {
+        return Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num-per-key", String.valueOf(50)));
+    }
+
     public int getDimCountDistinctMaxCardinality() {
         return Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct", "5000000"));
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3f4c576053..8d395af6a7 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -264,6 +264,7 @@ public CubeInstance updateCubeStatus(CubeInstance cube, RealizationStatusEnum ne
             cube = cube.latestCopyForWrite(); // get a latest copy
             CubeUpdate update = new CubeUpdate(cube);
             update.setStatus(newStatus);
+            ProjectManager.getInstance(config).touchProject(cube.getProject());
             return updateCube(update);
         }
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 5122fd8401..4f1de33c2b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -355,6 +355,18 @@ public void removeExtFilterFromProject(String filterName, String projectName) th
             save(projectInstance);
         }
     }
+
+    /**
+     * change the last project modify time
+     * @param projectName
+     * @throws IOException
+     */
+    public void touchProject(String projectName) throws IOException {
+        try (AutoLock lock = prjMapLock.lockForWrite()) {
+            ProjectInstance projectInstance = getProject(projectName);
+            save(projectInstance);
+        }
+    }
     
     private ProjectInstance save(ProjectInstance prj) throws IOException {
         crud.save(prj);
diff --git a/pom.xml b/pom.xml
index 854fd3b39e..1888a7f5e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
         <commons-upload.version>1.3.3</commons-upload.version>
         <commons-math3.version>3.1.1</commons-math3.version>
         <commons-collections.version>3.2.2</commons-collections.version>
+        <commons-pool.version>2.5.0</commons-pool.version>
 
         <!-- Calcite deps, keep compatible with calcite.version -->
         <jackson.version>2.9.5</jackson.version>
@@ -507,6 +508,11 @@
                 <version>${commons-collections.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-pool2</artifactId>
+                <version>${commons-pool.version}</version>
+            </dependency>
 
             <!-- HBase2 dependencies -->
             <dependency>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 455ae786f4..baa6433f7e 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -85,6 +85,10 @@
             <groupId>net.sf.supercsv</groupId>
             <artifactId>super-csv</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
index 97a486326a..48e382ae31 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
@@ -28,6 +28,7 @@
  * 
  */
 public class PrepareSqlRequest extends SQLRequest {
+    private boolean enableStatementCache = true;
 
     public PrepareSqlRequest() {
         super();
@@ -43,6 +44,14 @@ public void setParams(StateParam[] params) {
         this.params = params;
     }
 
+    public boolean isEnableStatementCache() {
+        return enableStatementCache;
+    }
+
+    public void setEnableStatementCache(boolean enableStatementCache) {
+        this.enableStatementCache = enableStatementCache;
+    }
+
     public static class StateParam implements Serializable {
         private String className;
         private String value;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 4e3fe071cb..f195e74806 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -42,10 +42,15 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import javax.annotation.PostConstruct;
 
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+
 import org.apache.calcite.avatica.ColumnMetaData.Rep;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.jdbc.CalcitePrepare;
@@ -55,6 +60,11 @@
 import org.apache.calcite.sql.type.BasicSqlType;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContextFacade;
@@ -113,10 +123,6 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-
 /**
  * @author xduo
  */
@@ -144,11 +150,26 @@
     @Autowired
     private AclEvaluate aclEvaluate;
 
+    private GenericKeyedObjectPool<PreparedContextKey, PreparedContext> preparedContextPool;
+
     public QueryService() {
         queryStore = ResourceStore.getStore(getConfig());
+        preparedContextPool = createPreparedContextPool();
         badQueryDetector.start();
     }
 
+    private GenericKeyedObjectPool<PreparedContextKey, PreparedContext> createPreparedContextPool() {
+        PreparedContextFactory factory = new PreparedContextFactory();
+        KylinConfig kylinConfig = getConfig();
+        GenericKeyedObjectPoolConfig config = new GenericKeyedObjectPoolConfig();
+        config.setMaxTotalPerKey(kylinConfig.getQueryMaxCacheStatementInstancePerKey());
+        config.setMaxTotal(kylinConfig.getQueryMaxCacheStatementNum());
+        config.setBlockWhenExhausted(false);
+        config.setMinEvictableIdleTimeMillis(10 * 60 * 1000L); // cached statement will be evict if idle for 10 minutes
+        GenericKeyedObjectPool<PreparedContextKey, PreparedContext> pool = new GenericKeyedObjectPool<>(factory, config);
+        return pool;
+    }
+
     protected static void close(ResultSet resultSet, Statement stat, Connection conn) {
         OLAPContext.clearParameter();
         DBUtils.closeQuietly(resultSet);
@@ -502,10 +523,13 @@ public SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
 
     private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
         Connection conn = null;
-
+        boolean isPrepareRequest = isPrepareStatementWithParams(sqlRequest);
+        boolean borrowPrepareContext = false;
+        PreparedContextKey preparedContextKey = null;
+        PreparedContext preparedContext = null;
+        
         try {
             conn = QueryConnection.getConnection(sqlRequest.getProject());
-
             String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
             QueryContext context = QueryContextFacade.current();
             context.setUsername(userInfo);
@@ -539,11 +563,45 @@ private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception
             OLAPContext.setParameters(parameters);
             // force clear the query context before a new query
             OLAPContext.clearThreadLocalContexts();
-
-            return execute(correctedSql, sqlRequest, conn);
+            
+            // special case for prepare query.
+            List<List<String>> results = Lists.newArrayList();
+            List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+            if (BackdoorToggles.getPrepareOnly()) {
+                return getPrepareOnlySqlResponse(correctedSql, conn, false, results, columnMetas);
+            }
+            if (!isPrepareRequest) {
+                return executeRequest(correctedSql, sqlRequest, conn);
+            } else {
+                long prjLastModifyTime = getProjectManager().getProject(sqlRequest.getProject()).getLastModified();
+                preparedContextKey = new PreparedContextKey(sqlRequest.getProject(), prjLastModifyTime, correctedSql);
+                PrepareSqlRequest prepareSqlRequest = (PrepareSqlRequest) sqlRequest;
+                if (prepareSqlRequest.isEnableStatementCache()) {
+                    try {
+                        preparedContext = preparedContextPool.borrowObject(preparedContextKey);
+                        borrowPrepareContext = true;
+                    } catch (NoSuchElementException noElementException) {
+                        borrowPrepareContext = false;
+                        preparedContext = createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+                    }
+                    for(OLAPContext olapContext : preparedContext.olapContexts) {
+                        OLAPContext.registerContext(olapContext);
+                    }
+                } else {
+                    preparedContext = createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+                }
+                return executePrepareRequest(correctedSql, prepareSqlRequest, preparedContext);
+            }
 
         } finally {
             DBUtils.closeQuietly(conn);
+            if (preparedContext != null) {
+                if (borrowPrepareContext) {
+                    preparedContextPool.returnObject(preparedContextKey, preparedContext);
+                } else {
+                    preparedContext.close();
+                }
+            }
         }
     }
 
@@ -783,83 +841,100 @@ protected void processStatementAttr(Statement s, SQLRequest sqlRequest) throws S
      * @return
      * @throws Exception
      */
-    private SQLResponse execute(String correctedSql, SQLRequest sqlRequest, Connection conn) throws Exception {
+    private SQLResponse executeRequest(String correctedSql, SQLRequest sqlRequest, Connection conn) throws Exception {
         Statement stat = null;
         ResultSet resultSet = null;
         boolean isPushDown = false;
 
-        List<List<String>> results = Lists.newArrayList();
-        List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
-
+        Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
         try {
+            stat = conn.createStatement();
+            processStatementAttr(stat, sqlRequest);
+            resultSet = stat.executeQuery(correctedSql);
 
-            // special case for prepare query.
-            if (BackdoorToggles.getPrepareOnly()) {
-                return getPrepareOnlySqlResponse(correctedSql, conn, isPushDown, results, columnMetas);
-            }
+            r = createResponseFromResultSet(resultSet); 
 
-            if (isPrepareStatementWithParams(sqlRequest)) {
+        } catch (SQLException sqlException) {
+            r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
+            if (r == null)
+                throw sqlException;
 
-                stat = conn.prepareStatement(correctedSql); // to be closed in the finally
-                PreparedStatement prepared = (PreparedStatement) stat;
-                processStatementAttr(prepared, sqlRequest);
-                for (int i = 0; i < ((PrepareSqlRequest) sqlRequest).getParams().length; i++) {
-                    setParam(prepared, i + 1, ((PrepareSqlRequest) sqlRequest).getParams()[i]);
-                }
-                resultSet = prepared.executeQuery();
-            } else {
-                stat = conn.createStatement();
-                processStatementAttr(stat, sqlRequest);
-                resultSet = stat.executeQuery(correctedSql);
-            }
+            isPushDown = true;
+        } finally {
+            close(resultSet, stat, null); //conn is passed in, not my duty to close
+        }
 
-            ResultSetMetaData metaData = resultSet.getMetaData();
-            int columnCount = metaData.getColumnCount();
-
-            // Fill in selected column meta
-            for (int i = 1; i <= columnCount; ++i) {
-                columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i),
-                        metaData.isSearchable(i), metaData.isCurrency(i), metaData.isNullable(i), metaData.isSigned(i),
-                        metaData.getColumnDisplaySize(i), metaData.getColumnLabel(i), metaData.getColumnName(i),
-                        metaData.getSchemaName(i), metaData.getCatalogName(i), metaData.getTableName(i),
-                        metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i),
-                        metaData.getColumnTypeName(i), metaData.isReadOnly(i), metaData.isWritable(i),
-                        metaData.isDefinitelyWritable(i)));
-            }
+        return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+    }
 
-            // fill in results
-            while (resultSet.next()) {
-                List<String> oneRow = Lists.newArrayListWithCapacity(columnCount);
-                for (int i = 0; i < columnCount; i++) {
-                    oneRow.add((resultSet.getString(i + 1)));
-                }
+    private SQLResponse executePrepareRequest(String correctedSql, PrepareSqlRequest sqlRequest, PreparedContext preparedContext)
+            throws Exception {
+        ResultSet resultSet = null;
+        boolean isPushDown = false;
 
-                results.add(oneRow);
+        Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
+        Connection conn = preparedContext.conn;
+        try {
+            PreparedStatement preparedStatement = preparedContext.preparedStatement;
+            processStatementAttr(preparedStatement, sqlRequest);
+            for (int i = 0; i < sqlRequest.getParams().length; i++) {
+                setParam(preparedStatement, i + 1, (sqlRequest.getParams()[i]));
             }
-
+            resultSet = preparedStatement.executeQuery();
+            r = createResponseFromResultSet(resultSet);
         } catch (SQLException sqlException) {
-            Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
-            try {
-                r = PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(),
-                        sqlException, BackdoorToggles.getPrepareOnly());
-            } catch (Exception e2) {
-                logger.error("pushdown engine failed current query too", e2);
-                //exception in pushdown, throw it instead of exception in calcite
-                throw e2;
-            }
-
+            r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
             if (r == null)
                 throw sqlException;
 
             isPushDown = true;
-            results = r.getFirst();
-            columnMetas = r.getSecond();
-
         } finally {
-            close(resultSet, stat, null); //conn is passed in, not my duty to close
+            DBUtils.closeQuietly(resultSet);
         }
 
-        return buildSqlResponse(isPushDown, results, columnMetas);
+        return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+    }
+
+    private Pair<List<List<String>>, List<SelectedColumnMeta>> pushDownQuery(SQLRequest sqlRequest, String correctedSql, Connection conn, SQLException sqlException) throws Exception{
+        try {
+            return PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(),
+                    sqlException, BackdoorToggles.getPrepareOnly());
+        } catch (Exception e2) {
+            logger.error("pushdown engine failed current query too", e2);
+            //exception in pushdown, throw it instead of exception in calcite
+            throw e2;
+        }
+    }
+
+    private Pair<List<List<String>>, List<SelectedColumnMeta>> createResponseFromResultSet(ResultSet resultSet)
+            throws Exception {
+        List<List<String>> results = Lists.newArrayList();
+        List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+
+        ResultSetMetaData metaData = resultSet.getMetaData();
+        int columnCount = metaData.getColumnCount();
+
+        // Fill in selected column meta
+        for (int i = 1; i <= columnCount; ++i) {
+            columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), metaData
+                    .isSearchable(i), metaData.isCurrency(i), metaData.isNullable(i), metaData.isSigned(i), metaData
+                    .getColumnDisplaySize(i), metaData.getColumnLabel(i), metaData.getColumnName(i), metaData
+                    .getSchemaName(i), metaData.getCatalogName(i), metaData.getTableName(i), metaData.getPrecision(i),
+                    metaData.getScale(i), metaData.getColumnType(i), metaData.getColumnTypeName(i), metaData
+                            .isReadOnly(i), metaData.isWritable(i), metaData.isDefinitelyWritable(i)));
+        }
+
+        // fill in results
+        while (resultSet.next()) {
+            List<String> oneRow = Lists.newArrayListWithCapacity(columnCount);
+            for (int i = 0; i < columnCount; i++) {
+                oneRow.add((resultSet.getString(i + 1)));
+            }
+
+            results.add(oneRow);
+        }
+
+        return new Pair<>(results, columnMetas);
     }
 
     protected String makeErrorMsgUserFriendly(Throwable e) {
@@ -1044,6 +1119,13 @@ public void setCacheManager(CacheManager cacheManager) {
         this.cacheManager = cacheManager;
     }
 
+    private static PreparedContext createPreparedContext(String project, String sql) throws Exception{
+        Connection conn = QueryConnection.getConnection(project);
+        PreparedStatement preparedStatement = conn.prepareStatement(sql);
+        Collection<OLAPContext> olapContexts = OLAPContext.getThreadLocalContexts();
+        return new PreparedContext(conn, preparedStatement, olapContexts);
+    }
+
     private static class QueryRecordSerializer implements Serializer<QueryRecord> {
 
         private static final QueryRecordSerializer serializer = new QueryRecordSerializer();
@@ -1069,6 +1151,86 @@ public QueryRecord deserialize(DataInputStream in) throws IOException {
         }
     }
 
+    private static class PreparedContextFactory extends
+            BaseKeyedPooledObjectFactory<PreparedContextKey, PreparedContext> {
+
+        @Override
+        public PreparedContext create(PreparedContextKey key) throws Exception {
+            return createPreparedContext(key.project, key.sql);
+        }
+
+        @Override
+        public PooledObject<PreparedContext> wrap(PreparedContext value) {
+            return new DefaultPooledObject<>(value);
+        }
+
+        @Override
+        public void destroyObject(final PreparedContextKey key, final PooledObject<PreparedContext> p) {
+            PreparedContext cachedContext = p.getObject();
+            cachedContext.close();
+        }
+
+        @Override
+        public boolean validateObject(final PreparedContextKey key, final PooledObject<PreparedContext> p) {
+            return true;
+        }
+    }
+
+    private static class PreparedContextKey {
+        private String project;
+        private long prjLastModifyTime;
+        private String sql;
+
+        public PreparedContextKey(String project, long prjLastModifyTime, String sql) {
+            this.project = project;
+            this.prjLastModifyTime = prjLastModifyTime;
+            this.sql = sql;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            PreparedContextKey that = (PreparedContextKey) o;
+
+            if (prjLastModifyTime != that.prjLastModifyTime) return false;
+            if (project != null ? !project.equals(that.project) : that.project != null) return false;
+            return sql != null ? sql.equals(that.sql) : that.sql == null;
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = project != null ? project.hashCode() : 0;
+            result = 31 * result + (int) (prjLastModifyTime ^ (prjLastModifyTime >>> 32));
+            result = 31 * result + (sql != null ? sql.hashCode() : 0);
+            return result;
+        }
+    }
+
+    private static class PreparedContext {
+        private Connection conn;
+        private PreparedStatement preparedStatement;
+        private Collection<OLAPContext> olapContexts;
+
+        public PreparedContext(Connection conn, PreparedStatement preparedStatement,
+                               Collection<OLAPContext> olapContexts) {
+            this.conn = conn;
+            this.preparedStatement = preparedStatement;
+            this.olapContexts = olapContexts;
+        }
+
+        public void close() {
+            if (conn != null) {
+                DBUtils.closeQuietly(conn);
+            }
+            if (preparedStatement != null) {
+                DBUtils.closeQuietly(preparedStatement);
+            }
+        }
+    }
+
 }
 
 @SuppressWarnings("serial")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services