You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/28 13:47:04 UTC
[kylin] 04/04: KYLIN-4507 Override getSocketAddress() in
TCPMemcachedNodeImpl to auto detect memcached instance ip change
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a585119a59f4520114dd2d6fb357aeab41b32977
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Wed May 20 17:24:26 2020 +0800
KYLIN-4507 Override getSocketAddress() in TCPMemcachedNodeImpl to auto detect memcached instance ip change
---
.../net/spy/memcached/protocol/TCPMemcachedNodeImpl.java | 11 ++++++++++-
.../cachemanager/RemoteLocalFailOverCacheManager.java | 5 +++++
.../cachemanager/RemoteLocalFailOverCacheManagerTest.java | 14 ++++++++++++++
3 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
index 22dd730..f7da57e 100644
--- a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
+++ b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
@@ -19,10 +19,12 @@
package net.spy.memcached.protocol;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.nio.channels.UnsupportedAddressTypeException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
@@ -42,6 +44,9 @@ import net.spy.memcached.protocol.binary.TapAckOperationImpl;
/**
* Represents a node with the memcached cluster, along with buffering and
* operation queues.
+ *
+ * This is a modified version of the net.spy.memcached.protocol.TCPMemcachedNodeImpl
+ * Override the final method getSocketAddress() to refresh SocketAddress to achieve same hostname with ip changing
*/
public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode {
@@ -415,7 +420,11 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements Memcache
* @see net.spy.memcached.MemcachedNode#getSocketAddress()
*/
public final SocketAddress getSocketAddress() {
- return socketAddress;
+ if (!(socketAddress instanceof InetSocketAddress)) {
+ throw new UnsupportedAddressTypeException();
+ }
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+ return new InetSocketAddress(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
}
/*
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
index 22517f4..aae0d7c 100644
--- a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
@@ -68,4 +68,9 @@ public class RemoteLocalFailOverCacheManager extends AbstractCacheManager {
void enableRemoteCacheManager() {
remoteCacheManager.setClusterHealth(true);
}
+
+ @VisibleForTesting
+ MemcachedCacheManager getRemoteCacheManager() {
+ return remoteCacheManager;
+ }
}
\ No newline at end of file
diff --git a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
index 243e386..c45dd6f 100644
--- a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
+++ b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
@@ -32,6 +32,8 @@ import org.springframework.cache.ehcache.EhCacheCache;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+//
+//import net.spy.memcached.MemcachedClientIF;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:cacheContext.xml" })
@@ -58,5 +60,17 @@ public class RemoteLocalFailOverCacheManagerTest {
cacheManager.enableRemoteCacheManager();
Assert.assertTrue("Memcached enabled",
cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor);
+//
+// MemcachedCacheManager remoteCacheManager = cacheManager.getRemoteCacheManager();
+// for (int i = 0; i < 1000; i++) {
+// MemcachedClientIF client = (MemcachedClientIF) remoteCacheManager.getCache(QUERY_CACHE).getNativeCache();
+// System.out.println(i + " available servers: " + client.getAvailableServers() + "; unavailable servers: "
+// + client.getUnavailableServers());
+// try {
+// client.get("key");
+// Thread.sleep(2000L);
+// } catch (Exception e) {
+// }
+// }
}
}
\ No newline at end of file