You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/08/26 11:35:56 UTC

[GitHub] [incubator-doris] morningman commented on a change in pull request #4284: [Apache][Doris][Feature][FE]Support users with high QPS in the same query through result set cache

morningman commented on a change in pull request #4284:
URL: https://github.com/apache/incubator-doris/pull/4284#discussion_r477230172



##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
##########
@@ -582,31 +592,55 @@ private void handleSetStmt() {
     }
 
     // Process a select statement.
-    private void handleQueryStmt() throws Exception {
+    private boolean handleQueryStmt() throws Exception {

Review comment:
       Add comment to explain the return value.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
##########
@@ -582,31 +592,55 @@ private void handleSetStmt() {
     }
 
     // Process a select statement.
-    private void handleQueryStmt() throws Exception {
+    private boolean handleQueryStmt() throws Exception {
         // Every time set no send flag and clean all data in buffer
         context.getMysqlChannel().reset();
         QueryStmt queryStmt = (QueryStmt) parsedStmt;
+        // Use connection ID as session identifier
+        NamedKey namedKey = new NamedKey(String.valueOf(context.getConnectionId()),
+                StringUtils.toUtf8(queryStmt.toSql()));
+        LOG.debug("Result Cache NamedKey [" + namedKey + "]");

Review comment:
       ```suggestion
           LOG.debug("Result Cache NamedKey [{}]", namedKey);
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/common/Config.java
##########
@@ -1218,4 +1300,30 @@
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean recover_with_empty_tablet = false;
-}
+
+    /**
+     * Sql_result_cache
+     * Whether or not the result cache is enabled in Fe level, it can be overwritten with connection/session
+     * level setting in Context.
+     */
+    @ConfField(mutable = true)
+    public static boolean enable_result_cache = false;

Review comment:
       Please add docs for the new config in `administrator-guide/config/fe_config.md`

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
##########
@@ -582,31 +592,55 @@ private void handleSetStmt() {
     }
 
     // Process a select statement.
-    private void handleQueryStmt() throws Exception {
+    private boolean handleQueryStmt() throws Exception {
         // Every time set no send flag and clean all data in buffer
         context.getMysqlChannel().reset();
         QueryStmt queryStmt = (QueryStmt) parsedStmt;
+        // Use connection ID as session identifier
+        NamedKey namedKey = new NamedKey(String.valueOf(context.getConnectionId()),

Review comment:
       We had a `originStmt.originStmt`, which is the origin sql string. how about use it instead of `queryStmt.toSql()`?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cache;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.doris.common.Config;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A base cache on each FE node, local only
+ */
+public class SimpleLocalCache implements Cache {
+    private static final Logger LOG = LogManager.getLogger(SimpleLocalCache.class);
+    /**
+     * Minimum cost in "weight" per entry;
+     */
+    private static final int FIXED_COST = 8;
+    private static final int MAX_DEFAULT_BYTES = 1024 * 1024 * 1024;
+    private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+    private static final LZ4FastDecompressor LZ4_DECOMPRESSOR = LZ4_FACTORY.fastDecompressor();
+    private static final LZ4Compressor LZ4_COMPRESSOR = LZ4_FACTORY.fastCompressor();
+
+    private final com.github.benmanes.caffeine.cache.Cache<NamedKey, byte[]> cache;
+
+    public static Cache create() {
+        return create(CacheExecutorFactory.COMMON_FJP.createExecutor());
+    }
+
+    // Used in testing
+    public static Cache create(final Executor executor) {
+        LOG.info("Instance cache with expiration " + Config.result_cache_expire_after_in_milliseconds
+                + " milliseconds, max size " + Config.result_cache_size_in_bytes + " bytes");
+        Caffeine<Object, Object> builder = Caffeine.newBuilder().recordStats();
+        if (Config.result_cache_expire_after_in_milliseconds >= 0) {
+            builder.expireAfterWrite(Config.result_cache_expire_after_in_milliseconds, TimeUnit.MILLISECONDS);
+        }
+        if (Config.result_cache_size_in_bytes >= 0) {
+            builder.maximumWeight(Config.result_cache_size_in_bytes);
+        } else {
+            builder.maximumWeight(Math.min(MAX_DEFAULT_BYTES, Runtime.getRuntime().maxMemory() / 10));
+        }
+        builder.weigher((NamedKey key, byte[] value) -> value.length
+                + key.key.length
+                + key.namespace.length() * Character.BYTES
+                + FIXED_COST)
+                .executor(executor);
+        return new SimpleLocalCache(builder.build());
+    }
+
+    private SimpleLocalCache(final com.github.benmanes.caffeine.cache.Cache<NamedKey, byte[]> cache) {
+        this.cache = cache;
+    }
+
+    @Override
+    public byte[] get(NamedKey key) {
+        return decompress(cache.getIfPresent(key));
+    }
+
+    @Override
+    public void put(NamedKey key, byte[] value) {
+        final byte[] compresssize = compress(value);
+        if (compresssize.length > Config.result_cache_size_per_query_in_bytes) {
+            LOG.info(" result size more than result_cache_size_per_query_in_bytes: "
+                    + Config.result_cache_size_per_query_in_bytes + " so not storage in cache");
+            return;
+        }
+        cache.put(key, compress(value));

Review comment:
       ```suggestion
           cache.put(key, compresssize);
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
##########
@@ -582,31 +592,55 @@ private void handleSetStmt() {
     }
 
     // Process a select statement.
-    private void handleQueryStmt() throws Exception {
+    private boolean handleQueryStmt() throws Exception {
         // Every time set no send flag and clean all data in buffer
         context.getMysqlChannel().reset();
         QueryStmt queryStmt = (QueryStmt) parsedStmt;
+        // Use connection ID as session identifier
+        NamedKey namedKey = new NamedKey(String.valueOf(context.getConnectionId()),
+                StringUtils.toUtf8(queryStmt.toSql()));
+        LOG.debug("Result Cache NamedKey [" + namedKey + "]");
+
 
         QueryDetail queryDetail = new QueryDetail(context.getStartTime(),
-                                                  DebugUtil.printId(context.queryId()),
-                                                  context.getStartTime(), -1, -1,
-                                                  QueryDetail.QueryMemState.RUNNING,
-                                                  context.getDatabase(),
-                                                  originStmt.originStmt);
+                DebugUtil.printId(context.queryId()),
+                context.getStartTime(), -1, -1,
+                QueryDetail.QueryMemState.RUNNING,
+                context.getDatabase(),
+                originStmt.originStmt);
         context.setQueryDetail(queryDetail);
         QueryDetailQueue.addOrUpdateQueryDetail(queryDetail);
 
         if (queryStmt.isExplain()) {
-            String explainString = planner.getExplainString(planner.getFragments(), queryStmt.isVerbose() ? TExplainLevel.VERBOSE: TExplainLevel.NORMAL.NORMAL);
+            String explainString = planner.getExplainString(planner.getFragments(), queryStmt.isVerbose() ? TExplainLevel.VERBOSE : TExplainLevel.NORMAL.NORMAL);
             handleExplainStmt(explainString);
-            return;
+            return false;
         }
         coord = new Coordinator(context, analyzer, planner);
 
-        QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), 
+        QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                 new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
 
-        coord.exec();
+        boolean isCacheEnabled = context.getSessionVariable().isEnableResultCache();
+        LOG.debug("Session level cache is " + (isCacheEnabled ? "enabled" : false));

Review comment:
       ```suggestion
           LOG.debug("Session level cache is {}", (isCacheEnabled ? "enabled" : false));
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/cache/SimpleLocalCache.java
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cache;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.doris.common.Config;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A base cache on each FE node, local only
+ */
+public class SimpleLocalCache implements Cache {
+    private static final Logger LOG = LogManager.getLogger(SimpleLocalCache.class);
+    /**
+     * Minimum cost in "weight" per entry;
+     */
+    private static final int FIXED_COST = 8;
+    private static final int MAX_DEFAULT_BYTES = 1024 * 1024 * 1024;
+    private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+    private static final LZ4FastDecompressor LZ4_DECOMPRESSOR = LZ4_FACTORY.fastDecompressor();
+    private static final LZ4Compressor LZ4_COMPRESSOR = LZ4_FACTORY.fastCompressor();
+
+    private final com.github.benmanes.caffeine.cache.Cache<NamedKey, byte[]> cache;
+
+    public static Cache create() {
+        return create(CacheExecutorFactory.COMMON_FJP.createExecutor());
+    }
+
+    // Used in testing
+    public static Cache create(final Executor executor) {
+        LOG.info("Instance cache with expiration " + Config.result_cache_expire_after_in_milliseconds
+                + " milliseconds, max size " + Config.result_cache_size_in_bytes + " bytes");
+        Caffeine<Object, Object> builder = Caffeine.newBuilder().recordStats();
+        if (Config.result_cache_expire_after_in_milliseconds >= 0) {
+            builder.expireAfterWrite(Config.result_cache_expire_after_in_milliseconds, TimeUnit.MILLISECONDS);
+        }
+        if (Config.result_cache_size_in_bytes >= 0) {
+            builder.maximumWeight(Config.result_cache_size_in_bytes);
+        } else {
+            builder.maximumWeight(Math.min(MAX_DEFAULT_BYTES, Runtime.getRuntime().maxMemory() / 10));
+        }
+        builder.weigher((NamedKey key, byte[] value) -> value.length
+                + key.key.length
+                + key.namespace.length() * Character.BYTES
+                + FIXED_COST)
+                .executor(executor);
+        return new SimpleLocalCache(builder.build());
+    }
+
+    private SimpleLocalCache(final com.github.benmanes.caffeine.cache.Cache<NamedKey, byte[]> cache) {
+        this.cache = cache;
+    }
+
+    @Override
+    public byte[] get(NamedKey key) {
+        return decompress(cache.getIfPresent(key));
+    }
+
+    @Override
+    public void put(NamedKey key, byte[] value) {
+        final byte[] compresssize = compress(value);
+        if (compresssize.length > Config.result_cache_size_per_query_in_bytes) {
+            LOG.info(" result size more than result_cache_size_per_query_in_bytes: "

Review comment:
       ```suggestion
               LOG.debug(" result size more than result_cache_size_per_query_in_bytes: "
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org