You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/28 13:47:04 UTC

[kylin] 04/04: KYLIN-4507 Override getSocketAddress() in TCPMemcachedNodeImpl to auto detect memcached instance ip change

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit a585119a59f4520114dd2d6fb357aeab41b32977
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Wed May 20 17:24:26 2020 +0800

    KYLIN-4507 Override getSocketAddress() in TCPMemcachedNodeImpl to auto detect memcached instance ip change
---
 .../net/spy/memcached/protocol/TCPMemcachedNodeImpl.java   | 11 ++++++++++-
 .../cachemanager/RemoteLocalFailOverCacheManager.java      |  5 +++++
 .../cachemanager/RemoteLocalFailOverCacheManagerTest.java  | 14 ++++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
index 22dd730..f7da57e 100644
--- a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
+++ b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
@@ -19,10 +19,12 @@
 package net.spy.memcached.protocol;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.UnsupportedAddressTypeException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
@@ -42,6 +44,9 @@ import net.spy.memcached.protocol.binary.TapAckOperationImpl;
 /**
  * Represents a node with the memcached cluster, along with buffering and
  * operation queues.
+ *
+ * This is a modified version of the net.spy.memcached.protocol.TCPMemcachedNodeImpl
+ * Override the final method getSocketAddress() to refresh SocketAddress to achieve same hostname with ip changing
  */
 public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode {
 
@@ -415,7 +420,11 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements Memcache
      * @see net.spy.memcached.MemcachedNode#getSocketAddress()
      */
     public final SocketAddress getSocketAddress() {
-        return socketAddress;
+        if (!(socketAddress instanceof InetSocketAddress)) {
+            throw new UnsupportedAddressTypeException();
+        }
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+        return new InetSocketAddress(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
     }
 
     /*
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
index 22517f4..aae0d7c 100644
--- a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
@@ -68,4 +68,9 @@ public class RemoteLocalFailOverCacheManager extends AbstractCacheManager {
     void enableRemoteCacheManager() {
         remoteCacheManager.setClusterHealth(true);
     }
+
+    @VisibleForTesting
+    MemcachedCacheManager getRemoteCacheManager() {
+        return remoteCacheManager;
+    }
 }
\ No newline at end of file
diff --git a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
index 243e386..c45dd6f 100644
--- a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
+++ b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
@@ -32,6 +32,8 @@ import org.springframework.cache.ehcache.EhCacheCache;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+//
+//import net.spy.memcached.MemcachedClientIF;
 
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(locations = { "classpath:cacheContext.xml" })
@@ -58,5 +60,17 @@ public class RemoteLocalFailOverCacheManagerTest {
         cacheManager.enableRemoteCacheManager();
         Assert.assertTrue("Memcached enabled",
                 cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor);
+//
+//        MemcachedCacheManager remoteCacheManager = cacheManager.getRemoteCacheManager();
+//        for (int i = 0; i < 1000; i++) {
+//            MemcachedClientIF client = (MemcachedClientIF) remoteCacheManager.getCache(QUERY_CACHE).getNativeCache();
+//            System.out.println(i + " available servers: " + client.getAvailableServers() + "; unavailable servers: "
+//                    + client.getUnavailableServers());
+//            try {
+//                client.get("key");
+//                Thread.sleep(2000L);
+//            } catch (Exception e) {
+//            }
+//        }
     }
 }
\ No newline at end of file