You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/08/04 07:47:47 UTC

[GitHub] [incubator-doris] marising opened a new pull request #4248: [Feature][Cache] Cache proxy and coordinator #2581

marising opened a new pull request #4248:
URL: https://github.com/apache/incubator-doris/pull/4248


   ## Proposed changes
   1. Cache's abstract proxy class and BE's Cache implementation
   2. Cache coordinator implemented by consistent hashing
   #2581
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   - [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [] Documentation Update (if none of the other choices apply)
   - [] Code refactor (Modify the code structure, format the code, etc...)
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [x] I have create an issue on #ISSUE, and have described the bug/feature there in detail
   - [x] Compiling and unit tests pass locally with my changes
   - [x] I have added tests that prove my fix is effective or that my feature works
   - [] If this change need a document change, I have updated the document
   - [] Any dependent changes have been merged
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #4248: [Feature][Cache] Cache proxy and coordinator #2581

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #4248:
URL: https://github.com/apache/incubator-doris/pull/4248#discussion_r464991814



##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import org.apache.doris.proto.PCacheStatus;
+import org.apache.doris.proto.PCacheResponse;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PClearType;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.qe.SimpleScheduler;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.common.Status;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.List;
+
+/**
+ * Encapsulates access to BE, including network and other exception handlin
+ */
+public class CacheBeProxy extends CacheProxy {
+    private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
+
+    public void updateCache(UpdateCacheRequest request, Status status) {
+        PUniqueId sqlKey = request.sql_key;
+        Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
+        if (backend == null) {
+            LOG.warn("update cache can't find backend, sqlKey {}", sqlKey);
+            return;
+        }
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+        try {
+            PUpdateCacheRequest updateRequest = request.getRpcRequest();
+            Future<PCacheResponse> future = BackendServiceProxy.getInstance().updateCache(address, updateRequest);
+            PCacheResponse response = future.get(10000,TimeUnit.MICROSECONDS);
+            if( response.status == PCacheStatus.CACHE_OK) {

Review comment:
       Please code format this file.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import org.apache.doris.proto.PCacheStatus;
+import org.apache.doris.proto.PCacheResponse;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PClearType;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.qe.SimpleScheduler;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.common.Status;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.List;
+
+/**
+ * Encapsulates access to BE, including network and other exception handlin

Review comment:
       ```suggestion
    * Encapsulates access to BE, including network and other exception handling
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.proto.PCacheParam;
+import org.apache.doris.proto.PCacheValue;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.qe.RowBatch;
+import org.apache.doris.thrift.TResultBatch;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * It encapsulates the request and response parameters and methods, 
+ * Based on this abstract class, the cache can be placed in FE/BE  and other places such as redis
+ */
+public abstract class CacheProxy {
+    private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
+
+    public static class CacheParam extends PCacheParam {
+        public CacheParam(PCacheParam param) {
+            partition_key = param.partition_key;
+            last_version = param.last_version;
+            last_version_time = param.last_version_time;
+        }
+
+        public CacheParam(long partitionKey, long lastVersion, long lastVersionTime) {
+            partition_key = partitionKey;
+            last_version = lastVersion;
+            last_version_time = lastVersionTime;
+        }
+
+        public PCacheParam getRParam() {
+            PCacheParam param = new PCacheParam();
+            param.partition_key = partition_key;
+            param.last_version = last_version;
+            param.last_version_time = last_version_time;
+            return param;
+        }
+
+        public void Debug() {

Review comment:
       ```suggestion
           public void debug() {
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.proto.PCacheParam;
+import org.apache.doris.proto.PCacheValue;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.qe.RowBatch;
+import org.apache.doris.thrift.TResultBatch;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * It encapsulates the request and response parameters and methods, 
+ * Based on this abstract class, the cache can be placed in FE/BE  and other places such as redis
+ */
+public abstract class CacheProxy {
+    private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
+
+    public static class CacheParam extends PCacheParam {
+        public CacheParam(PCacheParam param) {
+            partition_key = param.partition_key;
+            last_version = param.last_version;
+            last_version_time = param.last_version_time;
+        }
+
+        public CacheParam(long partitionKey, long lastVersion, long lastVersionTime) {
+            partition_key = partitionKey;
+            last_version = lastVersion;
+            last_version_time = lastVersionTime;
+        }
+
+        public PCacheParam getRParam() {
+            PCacheParam param = new PCacheParam();
+            param.partition_key = partition_key;
+            param.last_version = last_version;
+            param.last_version_time = last_version_time;
+            return param;
+        }
+
+        public void Debug() {
+            LOG.info("cache param, part key {}, version {}, time {}",
+                    partition_key, last_version, last_version_time);
+        }
+    }
+
+    public static class CacheValue extends PCacheValue {
+        public CacheParam param;
+        public TResultBatch resultBatch;
+
+        public CacheValue() {
+            param = null;
+            row = Lists.newArrayList();
+            data_size = 0;
+            resultBatch = new TResultBatch();
+        }
+
+        public void addRpcResult(PCacheValue value) {
+            param = new CacheParam(value.param);
+            data_size += value.data_size;
+            row.addAll(value.row);
+        }
+
+        public RowBatch getRowBatch() {
+            for (byte[] one : row) {
+                resultBatch.addToRows(ByteBuffer.wrap(one));
+            }
+            RowBatch batch = new RowBatch();
+            resultBatch.setPacket_seq(1);
+            resultBatch.setIs_compressed(false);
+            batch.setBatch(resultBatch);
+            batch.setEos(true);
+            return batch;
+        }
+
+        public void addUpdateResult(long partitionKey, long lastVersion, long lastVersionTime, List<byte[]> rowList) {
+            param = new CacheParam(partitionKey, lastVersion, lastVersionTime);
+            for (byte[] buf : rowList) {
+                data_size += buf.length;
+                row.add(buf);
+            }
+        }
+
+        public PCacheValue getRpcValue() {
+            PCacheValue value = new PCacheValue();
+            value.param = param.getRParam();
+            value.data_size = data_size;
+            value.row = row;
+            return value;
+        }
+
+        public void Debug() {
+            LOG.info("cache value, partkey {}, ver:{}, time {}, row_num {}, data_size {}",
+                    param.partition_key, param.last_version, param.last_version_time,
+                    row.size(),
+                    data_size);
+            for (int i = 0; i < row.size(); i++) {
+                LOG.info("{}:{}", i, row.get(i));
+            }
+        }
+    }
+
+    public static class UpdateCacheRequest extends PUpdateCacheRequest {
+        public int value_count;
+        public int row_count;
+        public int data_size;
+        private String sqlStr;
+        private List<CacheValue> valueList;
+
+        public UpdateCacheRequest(String sqlStr) {
+            this.sqlStr = sqlStr;
+            this.sql_key = getMd5(this.sqlStr);
+            this.valueList = Lists.newArrayList();
+            value_count = 0;
+            row_count = 0;
+            data_size = 0;
+        }
+
+        public void addValue(long partitionKey, long lastVersion, long lastVersionTime, List<byte[]> rowList) {
+            CacheValue value = new CacheValue();
+            value.addUpdateResult(partitionKey, lastVersion, lastVersionTime, rowList);
+            valueList.add(value);
+            value_count++;
+        }
+
+        public PUpdateCacheRequest getRpcRequest() {
+            value_count = valueList.size();
+            PUpdateCacheRequest request = new PUpdateCacheRequest();
+            request.value = Lists.newArrayList();
+            request.sql_key = sql_key;
+            for (CacheValue value : valueList) {
+                request.value.add(value.getRpcValue());
+                row_count += value.row.size();
+                data_size = value.data_size;
+            }
+            return request;
+        }
+
+        public void Debug() {
+            LOG.info("update cache request, sql_key {}, value_size {}", DebugUtil.printId(sql_key),
+                    valueList.size());
+            for (CacheValue value : valueList) {
+                value.Debug();
+            }
+        }
+    }
+
+
+    public static class FetchCacheRequest extends PFetchCacheRequest {
+        private String sqlStr;
+        private List<CacheParam> paramList;
+
+        public FetchCacheRequest(String sqlStr) {
+            this.sqlStr = sqlStr;
+            this.sql_key = getMd5(this.sqlStr);
+            this.paramList = Lists.newArrayList();
+        }
+
+        public void addParam(long partitionKey, long lastVersion, long lastVersionTime) {
+            CacheParam param = new CacheParam(partitionKey, lastVersion, lastVersionTime);
+            paramList.add(param);
+        }
+
+        public PFetchCacheRequest getRpcRequest() {
+            PFetchCacheRequest request = new PFetchCacheRequest();
+            request.param = Lists.newArrayList();
+            request.sql_key = sql_key;
+            for (CacheParam param : paramList) {
+                request.param.add(param.getRParam());
+            }
+            return request;
+        }
+
+        public void Debug() {
+            LOG.info("fetch cache request, sql_key {}, param count {}", DebugUtil.printId(sql_key), paramList.size());
+            for (CacheParam param : paramList) {
+                param.Debug();
+            }
+        }
+    }
+
+    public static class FetchCacheResult extends PFetchCacheResult {
+        public int all_count;
+        public int value_count;
+        public int row_count;
+        public int data_size;
+        private List<CacheValue> valueList;
+
+        public FetchCacheResult() {
+            valueList = Lists.newArrayList();
+            all_count = 0;
+            value_count = 0;
+            row_count = 0;
+            data_size = 0;
+        }
+
+        public List<CacheValue> getValueList() {
+            return valueList;
+        }
+
+        public void setResult(PFetchCacheResult rpcResult) {
+            value_count = rpcResult.value.size();
+            for (int i = 0; i < rpcResult.value.size(); i++) {
+                PCacheValue rpcValue = rpcResult.value.get(i);
+                CacheValue value = new CacheValue();
+                value.addRpcResult(rpcValue);
+                valueList.add(value);
+                row_count += value.row.size();
+                data_size += value.data_size;
+            }
+        }
+
+        public void Debug() {
+            LOG.info("fetch cache result, value size {}", valueList.size());
+            for (CacheValue value : valueList) {
+                value.Debug();
+            }
+        }
+    }
+
+    public enum CacheProxyType {
+        FE,
+        BE,
+        OUTER
+    }
+
+    protected CacheProxy(){
+    }
+
+    public static CacheProxy getCacheProxy(CacheProxyType type) {
+        switch (type) {
+            case BE:
+                return new CacheBeProxy();
+            case FE:
+            case OUTER:
+                return null;
+        }
+        return null;
+    }
+
+    public abstract void updateCache(UpdateCacheRequest request, Status status);
+
+    public abstract FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Status status);
+
+    public abstract void clearCache(PClearCacheRequest clearRequest);
+
+
+    public static PUniqueId getMd5(String str) {
+        MessageDigest msgDigest;
+        try {
+            //128 bit
+            msgDigest = MessageDigest.getInstance("MD5");
+        } catch (Exception e) {
+            return null;
+        }
+        final byte[] digest = msgDigest.digest(str.getBytes());
+        PUniqueId key = new PUniqueId();
+        key.lo = getLong(digest, 0);//64 bit
+        key.hi = getLong(digest, 8);//64 bit
+        return key;
+    }
+
+    public static final long getLong(final byte[] array, final int offset) {

Review comment:
       Need a concrete method name

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import org.apache.doris.proto.PCacheStatus;
+import org.apache.doris.proto.PCacheResponse;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PClearType;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.qe.SimpleScheduler;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.common.Status;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.List;
+
+/**
+ * Encapsulates access to BE, including network and other exception handlin
+ */
+public class CacheBeProxy extends CacheProxy {
+    private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
+
+    public void updateCache(UpdateCacheRequest request, Status status) {
+        PUniqueId sqlKey = request.sql_key;
+        Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
+        if (backend == null) {
+            LOG.warn("update cache can't find backend, sqlKey {}", sqlKey);
+            return;
+        }
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+        try {
+            PUpdateCacheRequest updateRequest = request.getRpcRequest();
+            Future<PCacheResponse> future = BackendServiceProxy.getInstance().updateCache(address, updateRequest);
+            PCacheResponse response = future.get(10000,TimeUnit.MICROSECONDS);

Review comment:
       Would better make the `10000` static const value. In the future, we maybe need it to be configurable.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.qe.SimpleScheduler;
+import org.apache.doris.system.Backend;
+import org.apache.doris.proto.PUniqueId;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Use consistent hashing to find the BE corresponding to the key to avoid the change of BE leading to failure to hit the Cache
+ */
+public class CacheCoordinator {
+    private static final Logger LOG = LogManager.getLogger(CacheCoordinator.class);
+    private static final int VIRTUAL_NODES = 10;
+    private static final int REFRESH_NODE_TIME = 300000;
+    public boolean DebugModel = false;
+    private Hashtable<Long, Backend> realNodes = new Hashtable<>();
+    private SortedMap<Long, Backend> virtualNodes = new TreeMap<>();
+    private static Lock belock = new ReentrantLock();
+
+    private long lastRefreshTime;
+    private static CacheCoordinator cachePartition;
+
+    public static CacheCoordinator getInstance() {
+        if (cachePartition == null) {
+            cachePartition = new CacheCoordinator();
+        }
+        return cachePartition;
+    }
+
+    protected CacheCoordinator() {
+    }
+
+    /**
+     * Using the consistent hash and the hi part of sqlkey to get the backend node
+     *
+     * @param sqlKey 128 bit's sql md5
+     * @return Backend
+     */
+    public Backend findBackend(PUniqueId sqlKey) {
+        checkBackend();
+        Backend virtualNode = null;
+        try {
+            belock.lock();
+            SortedMap<Long, Backend> headMap = virtualNodes.headMap(sqlKey.hi);
+            SortedMap<Long, Backend> tailMap = virtualNodes.tailMap(sqlKey.hi);
+            int retryTimes = 0;
+            while (true) {
+                if (tailMap == null || tailMap.size() == 0) {
+                    tailMap = headMap;
+                    retryTimes += 1;
+                }
+                Long key = tailMap.firstKey();
+                virtualNode = tailMap.get(key);
+                if (SimpleScheduler.isAlive(virtualNode)) {
+                    break;
+                } else {
+                    LOG.debug("backend {} not alive, key {}, retry {}", virtualNode.getId(), key, retryTimes);
+                    virtualNode = null;
+                }
+                tailMap = tailMap.tailMap(key + 1);
+                retryTimes++;
+                if (retryTimes >= 5) {
+                    LOG.warn("find backend, reach max retry times {}", retryTimes);
+                    break;
+                }
+            }
+        } finally {
+            belock.unlock();
+        }
+        return virtualNode;
+    }
+
+    public void checkBackend() {

Review comment:
       What's the meaning of `checkBackend`?  which `clearBackend` firstly, then `addBackend`. `rebuildBackend` or `resetBackend` seems better?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import org.apache.doris.proto.PCacheStatus;
+import org.apache.doris.proto.PCacheResponse;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PClearType;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.qe.SimpleScheduler;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.common.Status;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.List;
+
+/**
+ * Encapsulates access to BE, including network and other exception handlin
+ */
+public class CacheBeProxy extends CacheProxy {
+    private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
+
+    public void updateCache(UpdateCacheRequest request, Status status) {
+        PUniqueId sqlKey = request.sql_key;
+        Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
+        if (backend == null) {
+            LOG.warn("update cache can't find backend, sqlKey {}", sqlKey);
+            return;
+        }
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+        try {
+            PUpdateCacheRequest updateRequest = request.getRpcRequest();
+            Future<PCacheResponse> future = BackendServiceProxy.getInstance().updateCache(address, updateRequest);
+            PCacheResponse response = future.get(10000,TimeUnit.MICROSECONDS);
+            if( response.status == PCacheStatus.CACHE_OK) {
+                status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
+            }else {
+                status.setStatus(response.status.toString());
+            }
+        } catch (Exception e) {
+            LOG.warn("update cache exception, sqlKey {}, e {}", sqlKey, e);
+            status.setRpcStatus(e.getMessage());
+            SimpleScheduler.addToBlacklist(backend.getId());
+        } finally {

Review comment:
       If do nothing, we could remove finally.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.proto.PCacheParam;
+import org.apache.doris.proto.PCacheValue;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.qe.RowBatch;
+import org.apache.doris.thrift.TResultBatch;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * It encapsulates the request and response parameters and methods, 
+ * Based on this abstract class, the cache can be placed in FE/BE  and other places such as redis
+ */
+public abstract class CacheProxy {
+    private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
+
+    public static class CacheParam extends PCacheParam {
+        public CacheParam(PCacheParam param) {
+            partition_key = param.partition_key;
+            last_version = param.last_version;
+            last_version_time = param.last_version_time;
+        }
+
+        public CacheParam(long partitionKey, long lastVersion, long lastVersionTime) {
+            partition_key = partitionKey;
+            last_version = lastVersion;
+            last_version_time = lastVersionTime;
+        }
+
+        public PCacheParam getRParam() {

Review comment:
       getParam ?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.proto.PCacheParam;
+import org.apache.doris.proto.PCacheValue;
+import org.apache.doris.proto.PClearCacheRequest;
+import org.apache.doris.proto.PFetchCacheRequest;
+import org.apache.doris.proto.PFetchCacheResult;
+import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.qe.RowBatch;
+import org.apache.doris.thrift.TResultBatch;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * It encapsulates the request and response parameters and methods, 
+ * Based on this abstract class, the cache can be placed in FE/BE  and other places such as redis
+ */
+public abstract class CacheProxy {
+    private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
+
+    public static class CacheParam extends PCacheParam {
+        public CacheParam(PCacheParam param) {
+            partition_key = param.partition_key;
+            last_version = param.last_version;
+            last_version_time = param.last_version_time;
+        }
+
+        public CacheParam(long partitionKey, long lastVersion, long lastVersionTime) {
+            partition_key = partitionKey;
+            last_version = lastVersion;
+            last_version_time = lastVersionTime;
+        }
+
+        public PCacheParam getRParam() {
+            PCacheParam param = new PCacheParam();
+            param.partition_key = partition_key;
+            param.last_version = last_version;
+            param.last_version_time = last_version_time;
+            return param;
+        }
+
+        public void Debug() {
+            LOG.info("cache param, part key {}, version {}, time {}",
+                    partition_key, last_version, last_version_time);
+        }
+    }
+
+    public static class CacheValue extends PCacheValue {
+        public CacheParam param;
+        public TResultBatch resultBatch;
+
+        public CacheValue() {
+            param = null;
+            row = Lists.newArrayList();

Review comment:
       ```suggestion
               rows = Lists.newArrayList();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen merged pull request #4248: [Feature][Cache] Cache proxy and coordinator #2581

Posted by GitBox <gi...@apache.org>.
kangkaisen merged pull request #4248:
URL: https://github.com/apache/incubator-doris/pull/4248


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org