You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/28 13:47:00 UTC

[kylin] branch master updated (52edac1 -> a585119)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 52edac1  KYLIN-4521 make the default-time-filter enable
     new 84064d3  KYLIN-4505 Change guava cache to spring cache for user authentication
     new f815c4f  KYLIN-4506 Remove unresolved memcached servers before initialize MemcachedClient
     new 03e67bc  KYLIN-4507 Add hack file TCPMemcachedNodeImpl.java
     new a585119  KYLIN-4507 Override getSocketAddress() in TCPMemcachedNodeImpl to auto detect memcached instance ip change

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../memcached/protocol/TCPMemcachedNodeImpl.java   | 650 +++++++++++++++++++++
 .../kylin/cache/cachemanager/CacheConstants.java   |   1 +
 .../cache/cachemanager/MemcachedCacheManager.java  |   3 +
 .../RemoteLocalFailOverCacheManager.java           |   5 +
 .../kylin/cache/memcached/MemcachedCache.java      |  16 +-
 .../RemoteLocalFailOverCacheManagerTest.java       |  14 +
 .../kylin/cache/memcached/MemcachedCacheTest.java  |   9 +
 .../rest/security/KylinAuthenticationProvider.java |  44 +-
 .../kylin/rest/service/KylinUserService.java       |  14 -
 .../org/apache/kylin/rest/service/UserService.java |  10 +-
 server/src/main/resources/ehcache-test.xml         |   7 +
 server/src/main/resources/ehcache.xml              |   7 +
 12 files changed, 735 insertions(+), 45 deletions(-)
 create mode 100644 cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java


[kylin] 02/04: KYLIN-4506 Remove unresolved memcached servers before initialize MemcachedClient

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f815c4ff1c4aa86692adbbbdfb93e4979f9cd49a
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Wed May 20 17:11:20 2020 +0800

    KYLIN-4506 Remove unresolved memcached servers before initialize MemcachedClient
---
 .../org/apache/kylin/cache/memcached/MemcachedCache.java | 16 +++++++++++++++-
 .../apache/kylin/cache/memcached/MemcachedCacheTest.java |  9 +++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
index e7ff75f..784665e 100644
--- a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
@@ -20,8 +20,11 @@ package org.apache.kylin.cache.memcached;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -136,13 +139,24 @@ public class MemcachedCache {
                     .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
                     .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
             return new MemcachedCache(new MemcachedClient(new MemcachedConnectionFactory(connectionFactory),
-                    AddrUtil.getAddresses(hostsStr)), config, memcachedPrefix, timeToLive);
+                    getResolvedAddrList(hostsStr)), config, memcachedPrefix, timeToLive);
         } catch (IOException e) {
             logger.error("Unable to create MemcachedCache instance.", e);
             throw Throwables.propagate(e);
         }
     }
 
+    public static List<InetSocketAddress> getResolvedAddrList(String hostsStr) {
+        List<InetSocketAddress> addrs = AddrUtil.getAddresses(hostsStr);
+        Iterator<InetSocketAddress> addrIterator = addrs.iterator();
+        while (addrIterator.hasNext()) {
+            if (addrIterator.next().isUnresolved()) {
+                addrIterator.remove();
+            }
+        }
+        return addrs;
+    }
+
     public String getName() {
         return memcachedPrefix;
     }
diff --git a/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
index 4fcbc5e..a4f8fc0 100644
--- a/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
+++ b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
@@ -21,6 +21,8 @@ package org.apache.kylin.cache.memcached;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.net.InetSocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -81,4 +83,11 @@ public class MemcachedCacheTest extends LocalFileMetadataTestCase {
             Assert.assertEquals("The value should not change", keyValueMap.get(key), memCachedAdaptor.get(key).get());
         }
     }
+
+    @Test
+    public void testGetResolvedAddrList() {
+        String hostsStr = "localhost:11211,fafddafaf:11211,fadfafaerqr:11211";
+        List<InetSocketAddress> addrList = MemcachedCache.getResolvedAddrList(hostsStr);
+        Assert.assertEquals(1, addrList.size());
+    }
 }
\ No newline at end of file


[kylin] 01/04: KYLIN-4505 Change guava cache to spring cache for user authentication

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 84064d32a64c9f942e6c4cc0e1fac243a5ee2b89
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Wed May 20 16:41:45 2020 +0800

    KYLIN-4505 Change guava cache to spring cache for user authentication
---
 .../kylin/cache/cachemanager/CacheConstants.java   |  1 +
 .../cache/cachemanager/MemcachedCacheManager.java  |  3 ++
 .../rest/security/KylinAuthenticationProvider.java | 44 +++++++++++-----------
 .../kylin/rest/service/KylinUserService.java       | 14 -------
 .../org/apache/kylin/rest/service/UserService.java | 10 ++---
 server/src/main/resources/ehcache-test.xml         |  7 ++++
 server/src/main/resources/ehcache.xml              |  7 ++++
 7 files changed, 42 insertions(+), 44 deletions(-)

diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
index 07b15a5..12d0c7a 100644
--- a/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
@@ -20,4 +20,5 @@ package org.apache.kylin.cache.cachemanager;
 
 public class CacheConstants {
     public static final String QUERY_CACHE = "StorageCache";
+    public static final String USER_CACHE = "UserCache";
 }
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
index f5acc6f..2ae49c8 100644
--- a/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
@@ -59,8 +59,11 @@ public class MemcachedCacheManager extends AbstractCacheManager {
     protected Collection<? extends Cache> loadCaches() {
         Cache successCache = new MemCachedCacheAdaptor(
                 new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.QUERY_CACHE)));
+        Cache userCache = new MemCachedCacheAdaptor(
+                new MemcachedCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.USER_CACHE, 86400)));
 
         addCache(successCache);
+        addCache(userCache);
 
         Collection<String> names = getCacheNames();
         Collection<Cache> caches = Lists.newArrayList();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java b/server-base/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
index 7ea3957..590c15a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
@@ -18,16 +18,22 @@
 
 package org.apache.kylin.rest.security;
 
+import static org.apache.kylin.cache.cachemanager.CacheConstants.USER_CACHE;
+
 import java.nio.charset.Charset;
 import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.rest.service.UserService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
 import org.springframework.security.authentication.AuthenticationProvider;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.AuthenticationException;
@@ -36,9 +42,7 @@ import org.springframework.security.core.userdetails.UserDetails;
 import org.springframework.security.core.userdetails.UsernameNotFoundException;
 import org.springframework.util.Assert;
 
-import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
-import org.apache.kylin.shaded.com.google.common.cache.RemovalListener;
-import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
 import org.apache.kylin.shaded.com.google.common.hash.Hashing;
 
@@ -49,21 +53,13 @@ public class KylinAuthenticationProvider implements AuthenticationProvider {
 
     private static final Logger logger = LoggerFactory.getLogger(KylinAuthenticationProvider.class);
 
-    private final static org.apache.kylin.shaded.com.google.common.cache.Cache<String, Authentication> userCache = CacheBuilder.newBuilder()
-            .maximumSize(KylinConfig.getInstanceFromEnv().getServerUserCacheMaxEntries())
-            .expireAfterWrite(KylinConfig.getInstanceFromEnv().getServerUserCacheExpireSeconds(), TimeUnit.SECONDS)
-            .removalListener(new RemovalListener<String, Authentication>() {
-                @Override
-                public void onRemoval(RemovalNotification<String, Authentication> notification) {
-                    KylinAuthenticationProvider.logger.debug("User cache {} is removed due to {}",
-                            notification.getKey(), notification.getCause());
-                }
-            }).build();
-
     @Autowired
     @Qualifier("userService")
     UserService userService;
 
+    @Autowired
+    private CacheManager cacheManager;
+
     //Embedded authentication provider
     private AuthenticationProvider authenticationProvider;
 
@@ -76,19 +72,21 @@ public class KylinAuthenticationProvider implements AuthenticationProvider {
         hf = Hashing.murmur3_128();
     }
 
+    @PostConstruct
+    public void init() {
+        Preconditions.checkNotNull(cacheManager, "cacheManager is not injected yet");
+    }
+    
     @Override
     public Authentication authenticate(Authentication authentication) throws AuthenticationException {
 
         byte[] hashKey = hf.hashString(authentication.getName() + authentication.getCredentials(), Charset.defaultCharset()).asBytes();
         String userKey = Arrays.toString(hashKey);
 
-        if (userService.isEvictCacheFlag()) {
-            userCache.invalidateAll();
-            userService.setEvictCacheFlag(false);
-        }
-        Authentication authed = userCache.getIfPresent(userKey);
-
-        if (null != authed) {
+        Authentication authed;
+        Cache.ValueWrapper authedUser = cacheManager.getCache(USER_CACHE).get(userKey);
+        if (authedUser != null) {
+            authed = (Authentication) authedUser.get();
             SecurityContextHolder.getContext().setAuthentication(authed);
         } else {
             try {
@@ -119,7 +117,7 @@ public class KylinAuthenticationProvider implements AuthenticationProvider {
                     userService.updateUser(user);
                 }
 
-                userCache.put(userKey, authed);
+                cacheManager.getCache(USER_CACHE).put(userKey, authed);
             } catch (AuthenticationException e) {
                 logger.error("Failed to auth user: " + authentication.getName(), e);
                 throw e;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/KylinUserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/KylinUserService.java
index 2e43ce4..1f93fc2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/KylinUserService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/KylinUserService.java
@@ -111,18 +111,6 @@ public class KylinUserService implements UserService {
 
     protected ResourceStore aclStore;
 
-    private boolean evictCacheFlag = false;
-
-    @Override
-    public boolean isEvictCacheFlag() {
-        return evictCacheFlag;
-    }
-
-    @Override
-    public void setEvictCacheFlag(boolean evictCacheFlag) {
-        this.evictCacheFlag = evictCacheFlag;
-    }
-
     @PostConstruct
     public void init() throws IOException {
         aclStore = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
@@ -156,7 +144,6 @@ public class KylinUserService implements UserService {
         }
         getKylinUserManager().update(managedUser);
         logger.trace("update user : {}", user.getUsername());
-        setEvictCacheFlag(true);
     }
 
     @Override
@@ -166,7 +153,6 @@ public class KylinUserService implements UserService {
         }
         getKylinUserManager().delete(userName);
         logger.trace("delete user : {}", userName);
-        setEvictCacheFlag(true);
     }
 
     @Override
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
index 90107a1..1734be2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -18,17 +18,13 @@
 
 package org.apache.kylin.rest.service;
 
-import org.apache.kylin.rest.security.ManagedUser;
-import org.springframework.security.provisioning.UserDetailsManager;
-
 import java.io.IOException;
 import java.util.List;
 
-public interface UserService extends UserDetailsManager {
-
-    boolean isEvictCacheFlag();
+import org.apache.kylin.rest.security.ManagedUser;
+import org.springframework.security.provisioning.UserDetailsManager;
 
-    void setEvictCacheFlag(boolean evictCacheFlag);
+public interface UserService extends UserDetailsManager {
 
     List<ManagedUser> listUsers() throws IOException;
 
diff --git a/server/src/main/resources/ehcache-test.xml b/server/src/main/resources/ehcache-test.xml
index 5bd4d13..47942de 100644
--- a/server/src/main/resources/ehcache-test.xml
+++ b/server/src/main/resources/ehcache-test.xml
@@ -20,6 +20,13 @@
             >
         <persistence strategy="none"/>
     </cache>
+    <cache name="UserCache"
+           eternal="false"
+           timeToLiveSeconds="10800"
+           memoryStoreEvictionPolicy="LRU"
+            >
+        <persistence strategy="none"/>
+    </cache>
     <cache name="ExceptionQueryCache"
            eternal="false"
            timeToIdleSeconds="86400"
diff --git a/server/src/main/resources/ehcache.xml b/server/src/main/resources/ehcache.xml
index c9efc13..ee10751 100644
--- a/server/src/main/resources/ehcache.xml
+++ b/server/src/main/resources/ehcache.xml
@@ -20,6 +20,13 @@
             >
         <persistence strategy="none"/>
     </cache>
+    <cache name="UserCache"
+           eternal="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+            >
+        <persistence strategy="none"/>
+    </cache>
     <cache name="ExceptionQueryCache"
            eternal="false"
            timeToIdleSeconds="86400"


[kylin] 03/04: KYLIN-4507 Add hack file TCPMemcachedNodeImpl.java

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 03e67bc1cf8fbb63ad59e868ae0287679bde4e5d
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Wed May 20 17:15:39 2020 +0800

    KYLIN-4507 Add hack file TCPMemcachedNodeImpl.java
---
 .../memcached/protocol/TCPMemcachedNodeImpl.java   | 641 +++++++++++++++++++++
 1 file changed, 641 insertions(+)

diff --git a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
new file mode 100644
index 0000000..22dd730
--- /dev/null
+++ b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached.protocol;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.ops.OperationState;
+import net.spy.memcached.protocol.binary.TapAckOperationImpl;
+
+/**
+ * Represents a node with the memcached cluster, along with buffering and
+ * operation queues.
+ */
+public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode {
+
+    private final SocketAddress socketAddress;
+    private final ByteBuffer rbuf;
+    private final ByteBuffer wbuf;
+    protected final BlockingQueue<Operation> writeQ;
+    private final BlockingQueue<Operation> readQ;
+    private final BlockingQueue<Operation> inputQueue;
+    private final long opQueueMaxBlockTime;
+    private final long authWaitTime;
+    private final ConnectionFactory connectionFactory;
+    private AtomicInteger reconnectAttempt = new AtomicInteger(1);
+    private SocketChannel channel;
+    private int toWrite = 0;
+    protected Operation optimizedOp = null;
+    private volatile SelectionKey sk = null;
+    private boolean shouldAuth = false;
+    private CountDownLatch authLatch;
+    private ArrayList<Operation> reconnectBlocked;
+    private long defaultOpTimeout;
+    private volatile long lastReadTimestamp = System.nanoTime();
+    private MemcachedConnection connection;
+
+    // operation Future.get timeout counter
+    private final AtomicInteger continuousTimeout = new AtomicInteger(0);
+
+    public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int bufSize, BlockingQueue<Operation> rq,
+            BlockingQueue<Operation> wq, BlockingQueue<Operation> iq, long opQueueMaxBlockTime, boolean waitForAuth,
+            long dt, long authWaitTime, ConnectionFactory fact) {
+        super();
+        assert sa != null : "No SocketAddress";
+        assert c != null : "No SocketChannel";
+        assert bufSize > 0 : "Invalid buffer size: " + bufSize;
+        assert rq != null : "No operation read queue";
+        assert wq != null : "No operation write queue";
+        assert iq != null : "No input queue";
+        socketAddress = sa;
+        connectionFactory = fact;
+        this.authWaitTime = authWaitTime;
+        setChannel(c);
+        // Since these buffers are allocated rarely (only on client creation
+        // or reconfigure), and are passed to Channel.read() and Channel.write(),
+        // use direct buffers to avoid
+        //   http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6214569
+        rbuf = ByteBuffer.allocateDirect(bufSize);
+        wbuf = ByteBuffer.allocateDirect(bufSize);
+        getWbuf().clear();
+        readQ = rq;
+        writeQ = wq;
+        inputQueue = iq;
+        this.opQueueMaxBlockTime = opQueueMaxBlockTime;
+        shouldAuth = waitForAuth;
+        defaultOpTimeout = dt;
+        setupForAuth();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#copyInputQueue()
+     */
+    public final void copyInputQueue() {
+        Collection<Operation> tmp = new ArrayList<Operation>();
+
+        // don't drain more than we have space to place
+        inputQueue.drainTo(tmp, writeQ.remainingCapacity());
+        writeQ.addAll(tmp);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#destroyInputQueue()
+     */
+    public Collection<Operation> destroyInputQueue() {
+        Collection<Operation> rv = new ArrayList<Operation>();
+        inputQueue.drainTo(rv);
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#setupResend()
+     */
+    public final void setupResend() {
+        // First, reset the current write op, or cancel it if we should
+        // be authenticating
+        Operation op = getCurrentWriteOp();
+        if (shouldAuth && op != null) {
+            op.cancel();
+        } else if (op != null) {
+            ByteBuffer buf = op.getBuffer();
+            if (buf != null) {
+                buf.reset();
+            } else {
+                getLogger().info("No buffer for current write op, removing");
+                removeCurrentWriteOp();
+            }
+        }
+        // Now cancel all the pending read operations. Might be better to
+        // to requeue them.
+        while (hasReadOp()) {
+            op = removeCurrentReadOp();
+            if (op != getCurrentWriteOp()) {
+                getLogger().warn("Discarding partially completed op: %s", op);
+                op.cancel();
+            }
+        }
+
+        while (shouldAuth && hasWriteOp()) {
+            op = removeCurrentWriteOp();
+            getLogger().warn("Discarding partially completed op: %s", op);
+            op.cancel();
+        }
+
+        getWbuf().clear();
+        getRbuf().clear();
+        toWrite = 0;
+    }
+
+    // Prepare the pending operations. Return true if there are any pending
+    // ops
+    private boolean preparePending() {
+        // Copy the input queue into the write queue.
+        copyInputQueue();
+
+        // Now check the ops
+        Operation nextOp = getCurrentWriteOp();
+        while (nextOp != null && nextOp.isCancelled()) {
+            getLogger().info("Removing cancelled operation: %s", nextOp);
+            removeCurrentWriteOp();
+            nextOp = getCurrentWriteOp();
+        }
+        return nextOp != null;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#fillWriteBuffer(boolean)
+     */
+    public final void fillWriteBuffer(boolean shouldOptimize) {
+        if (toWrite == 0 && readQ.remainingCapacity() > 0) {
+            getWbuf().clear();
+            Operation o = getNextWritableOp();
+
+            while (o != null && toWrite < getWbuf().capacity()) {
+                synchronized (o) {
+                    assert o.getState() == OperationState.WRITING;
+
+                    ByteBuffer obuf = o.getBuffer();
+                    assert obuf != null : "Didn't get a write buffer from " + o;
+                    int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining());
+                    byte[] b = new byte[bytesToCopy];
+                    obuf.get(b);
+                    getWbuf().put(b);
+                    getLogger().debug("After copying stuff from %s: %s", o, getWbuf());
+                    if (!o.getBuffer().hasRemaining()) {
+                        o.writeComplete();
+                        transitionWriteItem();
+
+                        preparePending();
+                        if (shouldOptimize) {
+                            optimize();
+                        }
+
+                        o = getNextWritableOp();
+                    }
+                    toWrite += bytesToCopy;
+                }
+            }
+            getWbuf().flip();
+            assert toWrite <= getWbuf().capacity() : "toWrite exceeded capacity: " + this;
+            assert toWrite == getWbuf().remaining() : "Expected " + toWrite + " remaining, got "
+                    + getWbuf().remaining();
+        } else {
+            getLogger().debug("Buffer is full, skipping");
+        }
+    }
+
+    private Operation getNextWritableOp() {
+        Operation o = getCurrentWriteOp();
+        while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
+            synchronized (o) {
+                if (o.isCancelled()) {
+                    getLogger().debug("Not writing cancelled op.");
+                    Operation cancelledOp = removeCurrentWriteOp();
+                    assert o == cancelledOp;
+                } else if (o.isTimedOut(defaultOpTimeout)) {
+                    getLogger().debug("Not writing timed out op.");
+                    Operation timedOutOp = removeCurrentWriteOp();
+                    assert o == timedOutOp;
+                } else {
+                    o.writing();
+                    if (!(o instanceof TapAckOperationImpl)) {
+                        readQ.add(o);
+                    }
+                    return o;
+                }
+                o = getCurrentWriteOp();
+            }
+        }
+        return o;
+    }
+
+    /* (non-Javadoc)
+     * @see net.spy.memcached.MemcachedNode#transitionWriteItem()
+     */
+    public final void transitionWriteItem() {
+        Operation op = removeCurrentWriteOp();
+        assert op != null : "There is no write item to transition";
+        getLogger().debug("Finished writing %s", op);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#optimize()
+     */
+    protected abstract void optimize();
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getCurrentReadOp()
+     */
+    public final Operation getCurrentReadOp() {
+        return readQ.peek();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#removeCurrentReadOp()
+     */
+    public final Operation removeCurrentReadOp() {
+        return readQ.remove();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getCurrentWriteOp()
+     */
+    public final Operation getCurrentWriteOp() {
+        return optimizedOp == null ? writeQ.peek() : optimizedOp;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#removeCurrentWriteOp()
+     */
+    public final Operation removeCurrentWriteOp() {
+        Operation rv = optimizedOp;
+        if (rv == null) {
+            rv = writeQ.remove();
+        } else {
+            optimizedOp = null;
+        }
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#hasReadOp()
+     */
+    public final boolean hasReadOp() {
+        return !readQ.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#hasWriteOp()
+     */
+    public final boolean hasWriteOp() {
+        return !(optimizedOp == null && writeQ.isEmpty());
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#addOp(net.spy.memcached.ops.Operation)
+     */
+    public final void addOp(Operation op) {
+        try {
+            if (!authLatch.await(authWaitTime, TimeUnit.MILLISECONDS)) {
+                FailureMode mode = connectionFactory.getFailureMode();
+                if (mode == FailureMode.Redistribute || mode == FailureMode.Retry) {
+                    getLogger().debug("Redistributing Operation " + op + " because auth " + "latch taken longer than "
+                            + authWaitTime + " milliseconds to " + "complete on node " + getSocketAddress());
+                    connection.retryOperation(op);
+                } else {
+                    op.cancel();
+                    getLogger().warn("Operation canceled because authentication "
+                            + "or reconnection and authentication has " + "taken more than " + authWaitTime
+                            + " milliseconds to " + "complete on node " + this);
+                    getLogger().debug("Canceled operation %s", op.toString());
+                }
+                return;
+            }
+            if (!inputQueue.offer(op, opQueueMaxBlockTime, TimeUnit.MILLISECONDS)) {
+                throw new IllegalStateException(
+                        "Timed out waiting to add " + op + "(max wait=" + opQueueMaxBlockTime + "ms)");
+            }
+        } catch (InterruptedException e) {
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException("Interrupted while waiting to add " + op);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * net.spy.memcached.MemcachedNode#insertOp(net.spy.memcached.ops.Operation)
+     */
+    public final void insertOp(Operation op) {
+        ArrayList<Operation> tmp = new ArrayList<Operation>(inputQueue.size() + 1);
+        tmp.add(op);
+        inputQueue.drainTo(tmp);
+        inputQueue.addAll(tmp);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getSelectionOps()
+     */
+    public final int getSelectionOps() {
+        int rv = 0;
+        if (getChannel().isConnected()) {
+            if (hasReadOp()) {
+                rv |= SelectionKey.OP_READ;
+            }
+            if (toWrite > 0 || hasWriteOp()) {
+                rv |= SelectionKey.OP_WRITE;
+            }
+        } else {
+            rv = SelectionKey.OP_CONNECT;
+        }
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getRbuf()
+     */
+    public final ByteBuffer getRbuf() {
+        return rbuf;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getWbuf()
+     */
+    public final ByteBuffer getWbuf() {
+        return wbuf;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getSocketAddress()
+     */
+    public final SocketAddress getSocketAddress() {
+        return socketAddress;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#isActive()
+     */
+    public final boolean isActive() {
+        return reconnectAttempt.get() == 0 && getChannel() != null && getChannel().isConnected();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#isAuthenticated()
+     */
+    public boolean isAuthenticated() {
+        return (0 == authLatch.getCount());
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#reconnecting()
+     */
+    public final void reconnecting() {
+        reconnectAttempt.incrementAndGet();
+        continuousTimeout.set(0);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#connected()
+     */
+    public final void connected() {
+        reconnectAttempt.set(0);
+        continuousTimeout.set(0);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getReconnectCount()
+     */
+    public final int getReconnectCount() {
+        return reconnectAttempt.get();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#toString()
+     */
+    @Override
+    public final String toString() {
+        int sops = 0;
+        if (getSk() != null && getSk().isValid()) {
+            sops = getSk().interestOps();
+        }
+        int rsize = readQ.size() + (optimizedOp == null ? 0 : 1);
+        int wsize = writeQ.size();
+        int isize = inputQueue.size();
+        return "{QA sa=" + getSocketAddress() + ", #Rops=" + rsize + ", #Wops=" + wsize + ", #iq=" + isize + ", topRop="
+                + getCurrentReadOp() + ", topWop=" + getCurrentWriteOp() + ", toWrite=" + toWrite + ", interested="
+                + sops + "}";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * net.spy.memcached.MemcachedNode#registerChannel
+     * (java.nio.channels.SocketChannel, java.nio.channels.SelectionKey)
+     */
+    public final void registerChannel(SocketChannel ch, SelectionKey skey) {
+        setChannel(ch);
+        setSk(skey);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * net.spy.memcached.MemcachedNode#setChannel(java.nio.channels.SocketChannel)
+     */
+    public final void setChannel(SocketChannel to) {
+        assert channel == null || !channel.isOpen() : "Attempting to overwrite channel";
+        channel = to;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getChannel()
+     */
+    public final SocketChannel getChannel() {
+        return channel;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#setSk(java.nio.channels.SelectionKey)
+     */
+    public final void setSk(SelectionKey to) {
+        sk = to;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getSk()
+     */
+    public final SelectionKey getSk() {
+        return sk;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getBytesRemainingInBuffer()
+     */
+    public final int getBytesRemainingToWrite() {
+        return toWrite;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#writeSome()
+     */
+    public final int writeSome() throws IOException {
+        int wrote = channel.write(wbuf);
+        assert wrote >= 0 : "Wrote negative bytes?";
+        toWrite -= wrote;
+        assert toWrite >= 0 : "toWrite went negative after writing " + wrote + " bytes for " + this;
+        getLogger().debug("Wrote %d bytes", wrote);
+        return wrote;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#setContinuousTimeout
+     */
+    public void setContinuousTimeout(boolean timedOut) {
+        if (timedOut && isActive()) {
+            continuousTimeout.incrementAndGet();
+        } else {
+            continuousTimeout.set(0);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getContinuousTimeout
+     */
+    public int getContinuousTimeout() {
+        return continuousTimeout.get();
+    }
+
+    public final void fixupOps() {
+        // As the selection key can be changed at any point due to node
+        // failure, we'll grab the current volatile value and configure it.
+        SelectionKey s = sk;
+        if (s != null && s.isValid()) {
+            int iops = getSelectionOps();
+            getLogger().debug("Setting interested opts to %d", iops);
+            s.interestOps(iops);
+        } else {
+            getLogger().debug("Selection key is not valid.");
+        }
+    }
+
+    public final void authComplete() {
+        if (reconnectBlocked != null && reconnectBlocked.size() > 0) {
+            inputQueue.addAll(reconnectBlocked);
+        }
+        authLatch.countDown();
+    }
+
+    public final void setupForAuth() {
+        if (shouldAuth) {
+            authLatch = new CountDownLatch(1);
+            if (inputQueue.size() > 0) {
+                reconnectBlocked = new ArrayList<Operation>(inputQueue.size() + 1);
+                inputQueue.drainTo(reconnectBlocked);
+            }
+            assert (inputQueue.size() == 0);
+            setupResend();
+        } else {
+            authLatch = new CountDownLatch(0);
+        }
+    }
+
+    /**
+     * Number of milliseconds since the last read of this node completed.
+     *
+     * @return milliseconds since last read.
+     */
+    public long lastReadDelta() {
+        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastReadTimestamp);
+    }
+
+    /**
+     * Mark this node as having just completed a read.
+     */
+    public void completedRead() {
+        lastReadTimestamp = System.nanoTime();
+    }
+
+    @Override
+    public MemcachedConnection getConnection() {
+        return connection;
+    }
+
+    @Override
+    public void setConnection(MemcachedConnection connection) {
+        this.connection = connection;
+    }
+}
\ No newline at end of file


[kylin] 04/04: KYLIN-4507 Override getSocketAddress() in TCPMemcachedNodeImpl to auto detect memcached instance ip change

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit a585119a59f4520114dd2d6fb357aeab41b32977
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Wed May 20 17:24:26 2020 +0800

    KYLIN-4507 Override getSocketAddress() in TCPMemcachedNodeImpl to auto detect memcached instance ip change
---
 .../net/spy/memcached/protocol/TCPMemcachedNodeImpl.java   | 11 ++++++++++-
 .../cachemanager/RemoteLocalFailOverCacheManager.java      |  5 +++++
 .../cachemanager/RemoteLocalFailOverCacheManagerTest.java  | 14 ++++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
index 22dd730..f7da57e 100644
--- a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
+++ b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
@@ -19,10 +19,12 @@
 package net.spy.memcached.protocol;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.UnsupportedAddressTypeException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
@@ -42,6 +44,9 @@ import net.spy.memcached.protocol.binary.TapAckOperationImpl;
 /**
  * Represents a node with the memcached cluster, along with buffering and
  * operation queues.
+ *
+ * This is a modified version of the net.spy.memcached.protocol.TCPMemcachedNodeImpl
+ * Override the final method getSocketAddress() to refresh SocketAddress to achieve same hostname with ip changing
  */
 public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode {
 
@@ -415,7 +420,11 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements Memcache
      * @see net.spy.memcached.MemcachedNode#getSocketAddress()
      */
     public final SocketAddress getSocketAddress() {
-        return socketAddress;
+        if (!(socketAddress instanceof InetSocketAddress)) {
+            throw new UnsupportedAddressTypeException();
+        }
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+        return new InetSocketAddress(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
     }
 
     /*
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
index 22517f4..aae0d7c 100644
--- a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
@@ -68,4 +68,9 @@ public class RemoteLocalFailOverCacheManager extends AbstractCacheManager {
     void enableRemoteCacheManager() {
         remoteCacheManager.setClusterHealth(true);
     }
+
+    @VisibleForTesting
+    MemcachedCacheManager getRemoteCacheManager() {
+        return remoteCacheManager;
+    }
 }
\ No newline at end of file
diff --git a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
index 243e386..c45dd6f 100644
--- a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
+++ b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
@@ -32,6 +32,8 @@ import org.springframework.cache.ehcache.EhCacheCache;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+//
+//import net.spy.memcached.MemcachedClientIF;
 
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(locations = { "classpath:cacheContext.xml" })
@@ -58,5 +60,17 @@ public class RemoteLocalFailOverCacheManagerTest {
         cacheManager.enableRemoteCacheManager();
         Assert.assertTrue("Memcached enabled",
                 cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor);
+//
+//        MemcachedCacheManager remoteCacheManager = cacheManager.getRemoteCacheManager();
+//        for (int i = 0; i < 1000; i++) {
+//            MemcachedClientIF client = (MemcachedClientIF) remoteCacheManager.getCache(QUERY_CACHE).getNativeCache();
+//            System.out.println(i + " available servers: " + client.getAvailableServers() + "; unavailable servers: "
+//                    + client.getUnavailableServers());
+//            try {
+//                client.get("key");
+//                Thread.sleep(2000L);
+//            } catch (Exception e) {
+//            }
+//        }
     }
 }
\ No newline at end of file